This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 20b2a2b  [feat] Add execute type to workflow (#9)
20b2a2b is described below

commit 20b2a2bdd1673a2ffaedfd48abc1fdfacc2d50e0
Author: Jay Chung <[email protected]>
AuthorDate: Sat Nov 12 10:44:09 2022 +0800

    [feat] Add execute type to workflow (#9)
    
    Up to now, we can only submit workflow with parallel
    mode. this patch give users ability specific execute
    type to workflow
---
 docs/source/concept.rst                           | 32 ++++++++++
 docs/source/config.rst                            | 72 ++++++++++++-----------
 src/pydolphinscheduler/configuration.py           |  3 +
 src/pydolphinscheduler/core/process_definition.py | 39 +++++++++++-
 src/pydolphinscheduler/default_config.yaml        |  3 +
 src/pydolphinscheduler/java_gateway.py            |  2 +-
 tests/core/test_process_definition.py             | 19 ++++++
 tests/utils/test_yaml_parser.py                   |  1 +
 8 files changed, 134 insertions(+), 37 deletions(-)

diff --git a/docs/source/concept.rst b/docs/source/concept.rst
index 9a9527d..de49c9c 100644
--- a/docs/source/concept.rst
+++ b/docs/source/concept.rst
@@ -90,6 +90,38 @@ Tenant is the user who run task command in machine or in 
virtual machine. it cou
 
    Make should tenant exists in target machine, otherwise it will raise an 
error when you try to run command
 
+Execution Type
+~~~~~~~~~~~~~~
+
+Decision which behavior to run when process definition have multiple 
instances. when process definition
+schedule interval is too short, it may cause multiple instances run at the 
same time. We can use this
+parameter to control the behavior about how to run those process definition 
instances. Currently we
+have four execution type:
+
+* ``parallel`` (default value): it means all instances will allow to run even 
though the previous
+  instance is not finished.
+* ``serial_wait``: it means the all instance will wait for the previous 
instance to finish, and
+  all the waiting instances will be executed base on scheduling order.
+* ``serial_discard``: it means the all instance will be discard(abandon) if 
the previous instance
+  is not finished.
+* ``serial_priority``: it means the all instance will wait for the previous 
instance to finish,
+  and all the waiting instances will be executed base on process definition 
priority order.
+
+Parameter ``execution type`` can be set in
+
+* Direct assign statement. You can pick execute type from above and direct 
assign to parameter
+  ``execution_type``.
+
+  .. code-block:: python
+
+     pd = ProcessDefinition(
+         name="process-definition",
+         execution_type="parallel"
+     )
+
+* Via environment variables, configurations file setting, for more detail 
about those way setting, you can see
+  you can read :doc:`config` section.
+
 Tasks
 -----
 
diff --git a/docs/source/config.rst b/docs/source/config.rst
index 29a143d..3f7fff8 100644
--- a/docs/source/config.rst
+++ b/docs/source/config.rst
@@ -78,41 +78,43 @@ All Configurations in Environment Variables
 
 All environment variables as below, and you could modify their value via `Bash 
<by bash>`_ or `Python OS Module <by python os module>`_
 
-+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| Variable Section | Variable Name                      | description          
                                                                                
              |
-+==================+====================================+====================================================================================================================+
-|                  | ``PYDS_JAVA_GATEWAY_ADDRESS``      | Default Java gateway 
address, will use its value when it is set.                                     
              |
-+                  
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-|   Java Gateway   | ``PYDS_JAVA_GATEWAY_PORT``         | Default Java gateway 
port, will use its value when it is set.                                        
              |
-+                  
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-|                  | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java 
gateway auto convert, will use its value when it is set.                        
              |
-+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-|                  | ``PYDS_USER_NAME``                 | Default user name, 
will use when user's ``name`` when does not specify.                            
                |
-+                  
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-|                  | ``PYDS_USER_PASSWORD``             | Default user 
password, will use when user's ``password`` when does not specify.              
                      |
-+                  
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-|   Default User   | ``PYDS_USER_EMAIL``                | Default user email, 
will use when user's ``email`` when does not specify.                           
               |
-+                  
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-|                  | ``PYDS_USER_PHONE``                | Default user phone, 
will use when user's ``phone`` when does not specify.                           
               |
-+                  
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-|                  | ``PYDS_USER_STATE``                | Default user state, 
will use when user's ``state`` when does not specify.                           
               |
-+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-|                  | ``PYDS_WORKFLOW_PROJECT``          | Default workflow 
project name, will use its value when workflow does not specify the attribute 
``project``.        |
-+                  
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-|                  | ``PYDS_WORKFLOW_TENANT``           | Default workflow 
tenant, will use its value when workflow does not specify the attribute 
``tenant``.               |
-+                  
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| Default Workflow | ``PYDS_WORKFLOW_USER``             | Default workflow 
user, will use its value when workflow does not specify the attribute ``user``. 
                  |
-+                  
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-|                  | ``PYDS_WORKFLOW_QUEUE``            | Default workflow 
queue, will use its value when workflow does not specify the attribute 
``queue``.                 |
-+                  
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-|                  | ``PYDS_WORKFLOW_WORKER_GROUP``     | Default workflow 
worker group, will use its value when workflow does not specify the attribute 
``worker_group``.   |
-+                  
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-|                  | ``PYDS_WORKFLOW_RELEASE_STATE``    | Default workflow 
release state, will use its value when workflow does not specify the attribute 
``release_state``. |
-+                  
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-|                  | ``PYDS_WORKFLOW_TIME_ZONE``        | Default workflow 
worker group, will use its value when workflow does not specify the attribute 
``timezone``.       |
-+                  
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-|                  | ``PYDS_WORKFLOW_WARNING_TYPE``     | Default workflow 
warning type, will use its value when workflow does not specify the attribute 
``warning_type``.   |
-+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
++------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| Variable Section | Variable Name                      | description          
                                                                                
               |
++==================+====================================+=====================================================================================================================+
+|                  | ``PYDS_JAVA_GATEWAY_ADDRESS``      | Default Java gateway 
address, will use its value when it is set.                                     
               |
++                  
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+|   Java Gateway   | ``PYDS_JAVA_GATEWAY_PORT``         | Default Java gateway 
port, will use its value when it is set.                                        
               |
++                  
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java 
gateway auto convert, will use its value when it is set.                        
               |
++------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_USER_NAME``                 | Default user name, 
will use when user's ``name`` when does not specify.                            
                 |
++                  
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_USER_PASSWORD``             | Default user 
password, will use when user's ``password`` when does not specify.              
                       |
++                  
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+|   Default User   | ``PYDS_USER_EMAIL``                | Default user email, 
will use when user's ``email`` when does not specify.                           
                |
++                  
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_USER_PHONE``                | Default user phone, 
will use when user's ``phone`` when does not specify.                           
                |
++                  
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_USER_STATE``                | Default user state, 
will use when user's ``state`` when does not specify.                           
                |
++------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_WORKFLOW_PROJECT``          | Default workflow 
project name, will use its value when workflow does not specify the attribute 
``project``.         |
++                  
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_WORKFLOW_TENANT``           | Default workflow 
tenant, will use its value when workflow does not specify the attribute 
``tenant``.                |
++                  
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| Default Workflow | ``PYDS_WORKFLOW_USER``             | Default workflow 
user, will use its value when workflow does not specify the attribute ``user``. 
                   |
++                  
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_WORKFLOW_QUEUE``            | Default workflow 
queue, will use its value when workflow does not specify the attribute 
``queue``.                  |
++                  
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_WORKFLOW_WORKER_GROUP``     | Default workflow 
worker group, will use its value when workflow does not specify the attribute 
``worker_group``.    |
++                  
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_WORKFLOW_RELEASE_STATE``    | Default workflow 
release state, will use its value when workflow does not specify the attribute 
``release_state``.  |
++                  
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_WORKFLOW_TIME_ZONE``        | Default workflow 
worker group, will use its value when workflow does not specify the attribute 
``timezone``.        |
++                  
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_WORKFLOW_WARNING_TYPE``     | Default workflow 
warning type, will use its value when workflow does not specify the attribute 
``warning_type``.    |
++                  
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_WORKFLOW_EXECUTION_TYPE``   | Default workflow 
execution type, will use its value when workflow does not specify the attribute 
``execution_type``.|
++------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
 
 .. note::
 
diff --git a/src/pydolphinscheduler/configuration.py 
b/src/pydolphinscheduler/configuration.py
index 860f986..2f0c2c0 100644
--- a/src/pydolphinscheduler/configuration.py
+++ b/src/pydolphinscheduler/configuration.py
@@ -189,5 +189,8 @@ WORKFLOW_TIME_ZONE = os.environ.get(
 WORKFLOW_WARNING_TYPE = os.environ.get(
     "PYDS_WORKFLOW_WARNING_TYPE", configs.get("default.workflow.warning_type")
 )
+WORKFLOW_EXECUTION_TYPE = os.environ.get(
+    "PYDS_WORKFLOW_EXECUTION_TYPE", 
configs.get("default.workflow.execution_type")
+)
 
 # End Common Configuration Setting
diff --git a/src/pydolphinscheduler/core/process_definition.py 
b/src/pydolphinscheduler/core/process_definition.py
index 62de7ed..3487435 100644
--- a/src/pydolphinscheduler/core/process_definition.py
+++ b/src/pydolphinscheduler/core/process_definition.py
@@ -57,6 +57,20 @@ class ProcessDefinition(Base):
 
     TODO: maybe we should rename this class, currently use DS object name.
 
+    :param execution_type: Decision which behavior to run when process 
definition have multiple instances.
+        when process definition schedule interval is too short, it may cause 
multiple instances run at the
+        same time. We can use this parameter to control the behavior about how 
to run those process definition
+        instances. Currently we have four execution type:
+
+          * ``PARALLEL``: Default value, all instances will allow to run even 
though the previous
+            instance is not finished.
+          * ``SERIAL_WAIT``: All instance will wait for the previous instance 
to finish, and all
+            the waiting instances will be executed base on scheduling order.
+          * ``SERIAL_DISCARD``: All instances will be discard(abandon) if the 
previous instance is not
+            finished.
+          * ``SERIAL_PRIORITY``: means the all instance will wait for the 
previous instance to finish, and
+            all the waiting instances will be executed base on process 
definition priority order.
+
     :param user: The user for current process definition. Will create a new 
one if it do not exists. If your
         parameter ``project`` already exists but project's create do not 
belongs to ``user``, will grant
         ``project`` to ``user`` automatically.
@@ -86,6 +100,7 @@ class ProcessDefinition(Base):
         "worker_group",
         "warning_type",
         "warning_group_id",
+        "execution_type",
         "timeout",
         "release_state",
         "param",
@@ -109,6 +124,7 @@ class ProcessDefinition(Base):
         worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
         warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE,
         warning_group_id: Optional[int] = 0,
+        execution_type: Optional[str] = configuration.WORKFLOW_EXECUTION_TYPE,
         timeout: Optional[int] = 0,
         release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
         param: Optional[Dict] = None,
@@ -132,6 +148,17 @@ class ProcessDefinition(Base):
         else:
             self.warning_type = warning_type.strip().upper()
         self.warning_group_id = warning_group_id
+        if execution_type is None or execution_type.strip().upper() not in (
+            "PARALLEL",
+            "SERIAL_WAIT",
+            "SERIAL_DISCARD",
+            "SERIAL_PRIORITY",
+        ):
+            raise PyDSParamException(
+                "Parameter `execution_type` with unexpect value `%s`", 
execution_type
+            )
+        else:
+            self._execution_type = execution_type
         self.timeout = timeout
         self._release_state = release_state
         self.param = param
@@ -225,6 +252,16 @@ class ProcessDefinition(Base):
         """Set attribute release_state."""
         self._release_state = val.lower()
 
+    @property
+    def execution_type(self) -> str:
+        """Get attribute execution_type."""
+        return self._execution_type.upper()
+
+    @execution_type.setter
+    def execution_type(self, val: str) -> None:
+        """Set attribute execution_type."""
+        self._execution_type = val
+
     @property
     def param_json(self) -> Optional[List[Dict]]:
         """Return param json base on self.param."""
@@ -390,6 +427,7 @@ class ProcessDefinition(Base):
             json.dumps(self.param_json),
             self.warning_type,
             self.warning_group_id,
+            self.execution_type,
             self.timeout,
             self.worker_group,
             self._tenant,
@@ -399,7 +437,6 @@ class ProcessDefinition(Base):
             json.dumps(self.task_definition_json),
             json.dumps(self.schedule_json) if self.schedule_json else None,
             None,
-            None,
         )
         if len(self.resource_list) > 0:
             for res in self.resource_list:
diff --git a/src/pydolphinscheduler/default_config.yaml 
b/src/pydolphinscheduler/default_config.yaml
index 98d7b99..5ad3064 100644
--- a/src/pydolphinscheduler/default_config.yaml
+++ b/src/pydolphinscheduler/default_config.yaml
@@ -56,3 +56,6 @@ default:
     # change to ``FAILURE`` if you want to warn users when workflow failed. 
All available enum value are
     # ``NONE``, ``SUCCESS``, ``FAILURE``, ``ALL`` 
     warning_type: NONE
+    # Default execution type about how to run multiple workflow instances, 
default value is ``parallel`` which
+    # mean run all workflow instances parallel and the other value is 
``SERIAL_WAIT``, ``SERIAL_DISCARD``, ``SERIAL_PRIORITY``
+    execution_type: parallel
diff --git a/src/pydolphinscheduler/java_gateway.py 
b/src/pydolphinscheduler/java_gateway.py
index 54bb0a3..cd03d32 100644
--- a/src/pydolphinscheduler/java_gateway.py
+++ b/src/pydolphinscheduler/java_gateway.py
@@ -254,6 +254,7 @@ class JavaGate:
         global_params: str,
         warning_type: str,
         warning_group_id: int,
+        execution_type: str,
         timeout: int,
         worker_group: str,
         tenant_code: str,
@@ -262,7 +263,6 @@ class JavaGate:
         task_definition_json: str,
         schedule: Optional[str] = None,
         other_params_json: Optional[str] = None,
-        execution_type: Optional[str] = None,
     ):
         """Create or update process definition through java gateway."""
         return self.java_gateway.entry_point.createOrUpdateProcessDefinition(
diff --git a/tests/core/test_process_definition.py 
b/tests/core/test_process_definition.py
index 30445bf..c8fffc2 100644
--- a/tests/core/test_process_definition.py
+++ b/tests/core/test_process_definition.py
@@ -67,6 +67,7 @@ def test_process_definition_key_attr(func):
         ("worker_group", configuration.WORKFLOW_WORKER_GROUP),
         ("warning_type", configuration.WORKFLOW_WARNING_TYPE),
         ("warning_group_id", 0),
+        ("execution_type", configuration.WORKFLOW_EXECUTION_TYPE.upper()),
         ("release_state", 1),
     ],
 )
@@ -89,6 +90,7 @@ def test_process_definition_default_value(name, value):
         ("worker_group", str, "worker_group"),
         ("warning_type", str, "FAILURE"),
         ("warning_group_id", int, 1),
+        ("execution_type", str, "PARALLEL"),
         ("timeout", int, 1),
         ("param", dict, {"key": "value"}),
         (
@@ -210,6 +212,22 @@ def test_warn_type_not_support_type(val: str):
         ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, warning_type=val)
 
 
[email protected](
+    "val",
+    [
+        "ALLL",
+        "",
+        None,
+    ],
+)
+def test_execute_type_not_support_type(val: str):
+    """Test process definition param execute_type not support type error."""
+    with pytest.raises(
+        PyDSParamException, match="Parameter `execution_type` with unexpect 
value.*?"
+    ):
+        ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, execution_type=val)
+
+
 @pytest.mark.parametrize(
     "param, expect",
     [
@@ -321,6 +339,7 @@ def test_process_definition_get_define_without_task():
         "workerGroup": configuration.WORKFLOW_WORKER_GROUP,
         "warningType": configuration.WORKFLOW_WARNING_TYPE,
         "warningGroupId": 0,
+        "executionType": "PARALLEL",
         "timeout": 0,
         "releaseState": 1,
         "param": None,
diff --git a/tests/utils/test_yaml_parser.py b/tests/utils/test_yaml_parser.py
index ad3aaf7..3abdda6 100644
--- a/tests/utils/test_yaml_parser.py
+++ b/tests/utils/test_yaml_parser.py
@@ -63,6 +63,7 @@ expects = [
         "default.workflow.release_state": ("online", "offline"),
         "default.workflow.time_zone": ("Asia/Shanghai", "Europe/Amsterdam"),
         "default.workflow.warning_type": ("NONE", "SUCCESS"),
+        "default.workflow.execution_type": ("parallel", "serial_wait"),
     },
 ]
 

Reply via email to