This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git


The following commit(s) were added to refs/heads/main by this push:
     new b568f05  fix: missing task_execute_type for task (#140)
b568f05 is described below

commit b568f05ad9c25a7d8b13db1d0de387160504630c
Author: Jay Chung <[email protected]>
AuthorDate: Thu Feb 8 15:37:22 2024 +0800

    fix: missing task_execute_type for task (#140)
    
    fix: #138
---
 docs/source/tasks/index.rst                        |   3 +-
 docs/source/tasks/sub_workflow.rst                 |  38 +++++
 src/pydolphinscheduler/core/engine.py              |   4 +-
 src/pydolphinscheduler/core/task.py                | 178 +++++++++++++++++++++
 .../examples/task_sub_workflow_example.py          |  58 +++++++
 src/pydolphinscheduler/tasks/condition.py          |   4 +-
 src/pydolphinscheduler/tasks/datax.py              |   6 +-
 src/pydolphinscheduler/tasks/dependent.py          |   4 +-
 src/pydolphinscheduler/tasks/dvc.py                |   4 +-
 src/pydolphinscheduler/tasks/http.py               |   4 +-
 src/pydolphinscheduler/tasks/kubernetes.py         |   4 +-
 src/pydolphinscheduler/tasks/mlflow.py             |   4 +-
 src/pydolphinscheduler/tasks/openmldb.py           |   4 +-
 src/pydolphinscheduler/tasks/procedure.py          |   4 +-
 src/pydolphinscheduler/tasks/python.py             |   4 +-
 src/pydolphinscheduler/tasks/pytorch.py            |   4 +-
 src/pydolphinscheduler/tasks/sagemaker.py          |   4 +-
 src/pydolphinscheduler/tasks/shell.py              |   4 +-
 src/pydolphinscheduler/tasks/sql.py                |   4 +-
 src/pydolphinscheduler/tasks/sub_workflow.py       |   4 +-
 src/pydolphinscheduler/tasks/switch.py             |   4 +-
 tests/core/test_engine.py                          |   1 +
 tests/testing/constants.py                         |   1 -
 23 files changed, 311 insertions(+), 38 deletions(-)

diff --git a/docs/source/tasks/index.rst b/docs/source/tasks/index.rst
index fa2e73a..153d7ab 100644
--- a/docs/source/tasks/index.rst
+++ b/docs/source/tasks/index.rst
@@ -28,6 +28,7 @@ In this section
    sql
    python
    http
+   sub_workflow
 
    switch
    condition
@@ -40,8 +41,6 @@ In this section
    kubernetes
 
    datax
-   sub_workflow
-
    sagemaker
    mlflow
    openmldb
diff --git a/docs/source/tasks/sub_workflow.rst 
b/docs/source/tasks/sub_workflow.rst
index 026131a..eca0168 100644
--- a/docs/source/tasks/sub_workflow.rst
+++ b/docs/source/tasks/sub_workflow.rst
@@ -18,6 +18,44 @@
 Sub Workflow
 ============
 
+Task trigger exists workflow run, should make sure workflow exists in current 
project when you create
+sub workflow task.
+
+Example
+-------
+
+we have a simple example about how to use sub workflow task, when we want to 
create a sub workflow task,
+we should makeh sure in already exists in current project. So the first thing 
we do is to create a workflow
+will be used as sub workflow task.
+
+.. literalinclude:: 
../../../src/pydolphinscheduler/examples/task_sub_workflow_example.py
+   :start-after: [start sub_workflow_declare]
+   :end-before: [end sub_workflow_declare]
+
+workflow with name ``sub_workflow_upstream`` would be create after we exists 
``submit`` method.
+
+Then we create a main workflow, and the sub workflow task will connect to 
workflow we created before.
+
+.. literalinclude:: 
../../../src/pydolphinscheduler/examples/task_sub_workflow_example.py
+   :start-after: [start sub_workflow_task_declare]
+   :end-before: [end sub_workflow_task_declare]
+
+Finish we can submit or run sub workflow task by ``submit`` or ``run`` method. 
And you can also use workflow
+already exists in current project instead of create a new one.
+
+.. note::
+
+    We could only run the workflow contains sub workflow task, and the sub 
workflow task will trigger the
+    sub workflow run.
+
+
+.. literalinclude:: 
../../../src/pydolphinscheduler/examples/task_sub_workflow_example.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
+
+Dive Into
+---------
+
 .. automodule:: pydolphinscheduler.tasks.sub_workflow
 
 
diff --git a/src/pydolphinscheduler/core/engine.py 
b/src/pydolphinscheduler/core/engine.py
index 3e36820..b5bf0c8 100644
--- a/src/pydolphinscheduler/core/engine.py
+++ b/src/pydolphinscheduler/core/engine.py
@@ -21,7 +21,7 @@ from __future__ import annotations
 
 from py4j.protocol import Py4JJavaError
 
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
 from pydolphinscheduler.exceptions import PyDSParamException
 from pydolphinscheduler.java_gateway import gateway
 
@@ -34,7 +34,7 @@ class ProgramType(str):
     PYTHON = "PYTHON"
 
 
-class Engine(Task):
+class Engine(BatchTask):
     """Task engine object, declare behavior for engine task to 
dolphinscheduler.
 
     This is the parent class of spark, flink and mr tasks,
diff --git a/src/pydolphinscheduler/core/task.py 
b/src/pydolphinscheduler/core/task.py
index ac48ab8..a8e5fac 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -511,3 +511,181 @@ class Task(Base):
 
         """
         self._output_params[name] = value
+
+
+class BatchTask(Task):
+    """Task object, parent class for all exactly task type.
+
+    :param name: The name of the task. Node names within the same workflow 
must be unique.
+    :param task_type:
+    :param description: default None
+    :param flag: default TaskFlag.YES,
+    :param task_priority: default TaskPriority.MEDIUM
+    :param worker_group: default configuration.WORKFLOW_WORKER_GROUP
+    :param environment_name: default None
+    :param task_group_id: Identify of task group to restrict the parallelism 
of tasks instance run, default 0.
+    :param task_group_priority: Priority for same task group to, the higher 
the value, the higher the
+        priority, default 0.
+    :param delay_time: deault 0
+    :param fail_retry_times: default 0
+    :param fail_retry_interval: default 1
+    :param timeout_notify_strategy: default, None
+    :param timeout: Timeout attribute for task, in minutes. Task is consider 
as  timed out task when the
+        running time of a task exceeds than this value. when data type is 
:class:`datetime.timedelta` will
+        be converted to int(in minutes). default ``None``
+    :param resource_list: default None
+    :param wait_start_timeout: default None
+    :param condition_result: default None,
+    :param resource_plugin: default None
+    :param is_cache: default False
+    :param input_params: default None, input parameters, {param_name: 
param_value}
+    :param output_params: default None, input parameters, {param_name: 
param_value}
+    """
+
+    _DEFINE_ATTR = Task._DEFINE_ATTR | {"task_execute_type"}
+
+    def __init__(
+        self,
+        name: str,
+        task_type: str,
+        description: str | None = None,
+        flag: str | None = TaskFlag.YES,
+        task_priority: str | None = TaskPriority.MEDIUM,
+        worker_group: str | None = configuration.WORKFLOW_WORKER_GROUP,
+        environment_name: str | None = None,
+        task_group_id: int | None = 0,
+        task_group_priority: int | None = 0,
+        delay_time: int | None = 0,
+        fail_retry_times: int | None = 0,
+        fail_retry_interval: int | None = 1,
+        timeout_notify_strategy: str | None = None,
+        timeout: timedelta | int | None = None,
+        workflow: Workflow | None = None,
+        resource_list: list | None = None,
+        dependence: dict | None = None,
+        wait_start_timeout: dict | None = None,
+        condition_result: dict | None = None,
+        resource_plugin: ResourcePlugin | None = None,
+        is_cache: bool | None = False,
+        input_params: dict | None = None,
+        output_params: dict | None = None,
+        *args,
+        **kwargs,
+    ):
+        super().__init__(
+            name,
+            task_type,
+            description,
+            flag,
+            task_priority,
+            worker_group,
+            environment_name,
+            task_group_id,
+            task_group_priority,
+            delay_time,
+            fail_retry_times,
+            fail_retry_interval,
+            timeout_notify_strategy,
+            timeout,
+            workflow,
+            resource_list,
+            dependence,
+            wait_start_timeout,
+            condition_result,
+            resource_plugin,
+            is_cache,
+            input_params,
+            output_params,
+            *args,
+            **kwargs,
+        )
+        self.task_execute_type = "BATCH"
+
+
+class StreamTask(Task):
+    """Task object, parent class for all exactly task type.
+
+    :param name: The name of the task. Node names within the same workflow 
must be unique.
+    :param task_type:
+    :param description: default None
+    :param flag: default TaskFlag.YES,
+    :param task_priority: default TaskPriority.MEDIUM
+    :param worker_group: default configuration.WORKFLOW_WORKER_GROUP
+    :param environment_name: default None
+    :param task_group_id: Identify of task group to restrict the parallelism 
of tasks instance run, default 0.
+    :param task_group_priority: Priority for same task group to, the higher 
the value, the higher the
+        priority, default 0.
+    :param delay_time: deault 0
+    :param fail_retry_times: default 0
+    :param fail_retry_interval: default 1
+    :param timeout_notify_strategy: default, None
+    :param timeout: Timeout attribute for task, in minutes. Task is consider 
as  timed out task when the
+        running time of a task exceeds than this value. when data type is 
:class:`datetime.timedelta` will
+        be converted to int(in minutes). default ``None``
+    :param resource_list: default None
+    :param wait_start_timeout: default None
+    :param condition_result: default None,
+    :param resource_plugin: default None
+    :param is_cache: default False
+    :param input_params: default None, input parameters, {param_name: 
param_value}
+    :param output_params: default None, input parameters, {param_name: 
param_value}
+    """
+
+    _DEFINE_ATTR = Task._DEFINE_ATTR | {"task_execute_type"}
+
+    def __init__(
+        self,
+        name: str,
+        task_type: str,
+        description: str | None = None,
+        flag: str | None = TaskFlag.YES,
+        task_priority: str | None = TaskPriority.MEDIUM,
+        worker_group: str | None = configuration.WORKFLOW_WORKER_GROUP,
+        environment_name: str | None = None,
+        task_group_id: int | None = 0,
+        task_group_priority: int | None = 0,
+        delay_time: int | None = 0,
+        fail_retry_times: int | None = 0,
+        fail_retry_interval: int | None = 1,
+        timeout_notify_strategy: str | None = None,
+        timeout: timedelta | int | None = None,
+        workflow: Workflow | None = None,
+        resource_list: list | None = None,
+        dependence: dict | None = None,
+        wait_start_timeout: dict | None = None,
+        condition_result: dict | None = None,
+        resource_plugin: ResourcePlugin | None = None,
+        is_cache: bool | None = False,
+        input_params: dict | None = None,
+        output_params: dict | None = None,
+        *args,
+        **kwargs,
+    ):
+        super().__init__(
+            name,
+            task_type,
+            description,
+            flag,
+            task_priority,
+            worker_group,
+            environment_name,
+            task_group_id,
+            task_group_priority,
+            delay_time,
+            fail_retry_times,
+            fail_retry_interval,
+            timeout_notify_strategy,
+            timeout,
+            workflow,
+            resource_list,
+            dependence,
+            wait_start_timeout,
+            condition_result,
+            resource_plugin,
+            is_cache,
+            input_params,
+            output_params,
+            *args,
+            **kwargs,
+        )
+        self.task_execute_type = "STREAM"
diff --git a/src/pydolphinscheduler/examples/task_sub_workflow_example.py 
b/src/pydolphinscheduler/examples/task_sub_workflow_example.py
new file mode 100644
index 0000000..df7b6ce
--- /dev/null
+++ b/src/pydolphinscheduler/examples/task_sub_workflow_example.py
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""A example workflow for task sub workflow."""
+
+# [start tutorial]
+# [start package_import]
+# Import Workflow object to define your workflow attributes
+from pydolphinscheduler.core.workflow import Workflow
+
+# Import task Shell object cause we would create some shell tasks later
+from pydolphinscheduler.tasks.sub_workflow import SubWorkflow
+from pydolphinscheduler.tasks.shell import Shell
+
+
+# [start workflow_declare]
+# [start sub_workflow_declare]
+with Workflow(name="sub_workflow_downstream") as wf_downstream, Workflow(
+    name="task_sub_workflow_example"
+) as wf_upstream:
+    sub_workflow_ds_task = Shell(
+        name="task_sub_workflow",
+        command="echo 'call sub workflow success!'",
+        workflow=wf_downstream,
+    )
+    wf_downstream.submit()
+    # [end sub_workflow_declare]
+
+    sub_workflow_pre = Shell(
+        name="pre-task",
+        command="echo 'prefix task for sub workflow'",
+        workflow=wf_upstream,
+    )
+    # [start sub_workflow_task_declare]
+    sw_task = SubWorkflow(
+        name="sub_workflow",
+        workflow_name=wf_downstream.name,
+        workflow=wf_upstream,
+    )
+    # [end sub_workflow_task_declare]
+    sub_workflow_pre >> sw_task
+    # Please make sure workflow with name `wf_downstream.name` exists when we 
submit or run sub workflow task
+    wf_upstream.run()
+# [end workflow_declare]
diff --git a/src/pydolphinscheduler/tasks/condition.py 
b/src/pydolphinscheduler/tasks/condition.py
index 3d8f6f8..9359b5d 100644
--- a/src/pydolphinscheduler/tasks/condition.py
+++ b/src/pydolphinscheduler/tasks/condition.py
@@ -20,7 +20,7 @@
 from __future__ import annotations
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask, Task
 from pydolphinscheduler.exceptions import PyDSParamException
 from pydolphinscheduler.models.base import Base
 
@@ -154,7 +154,7 @@ class Or(ConditionOperator):
         super().__init__(*args)
 
 
-class Condition(Task):
+class Condition(BatchTask):
     """Task condition object, declare behavior for condition task to 
dolphinscheduler."""
 
     def __init__(
diff --git a/src/pydolphinscheduler/tasks/datax.py 
b/src/pydolphinscheduler/tasks/datax.py
index 59edfee..c6734ce 100644
--- a/src/pydolphinscheduler/tasks/datax.py
+++ b/src/pydolphinscheduler/tasks/datax.py
@@ -21,11 +21,11 @@ from __future__ import annotations
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.mixin import WorkerResourceMixin
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
 from pydolphinscheduler.models.datasource import Datasource
 
 
-class CustomDataX(WorkerResourceMixin, Task):
+class CustomDataX(WorkerResourceMixin, BatchTask):
     """Task CustomDatax object, declare behavior for custom DataX task to 
dolphinscheduler.
 
     You provider json template for DataX, it can synchronize data according to 
the template you provided.
@@ -87,7 +87,7 @@ class CustomDataX(WorkerResourceMixin, Task):
         self.add_attr(**kwargs)
 
 
-class DataX(WorkerResourceMixin, Task):
+class DataX(WorkerResourceMixin, BatchTask):
     """Task DataX object, declare behavior for DataX task to dolphinscheduler.
 
     It should run database datax job in multiply sql link engine, such as:
diff --git a/src/pydolphinscheduler/tasks/dependent.py 
b/src/pydolphinscheduler/tasks/dependent.py
index 6c5ac2b..be716f6 100644
--- a/src/pydolphinscheduler/tasks/dependent.py
+++ b/src/pydolphinscheduler/tasks/dependent.py
@@ -21,7 +21,7 @@ from __future__ import annotations
 import warnings
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
 from pydolphinscheduler.exceptions import PyDSJavaGatewayException, 
PyDSParamException
 from pydolphinscheduler.java_gateway import gateway
 from pydolphinscheduler.models.base import Base
@@ -272,7 +272,7 @@ class Or(DependentOperator):
         super().__init__(*args)
 
 
-class Dependent(Task):
+class Dependent(BatchTask):
     """Task dependent object, declare behavior for dependent task to 
dolphinscheduler."""
 
     def __init__(self, name: str, dependence: DependentOperator, *args, 
**kwargs):
diff --git a/src/pydolphinscheduler/tasks/dvc.py 
b/src/pydolphinscheduler/tasks/dvc.py
index 5cbc6f0..46b2125 100644
--- a/src/pydolphinscheduler/tasks/dvc.py
+++ b/src/pydolphinscheduler/tasks/dvc.py
@@ -21,7 +21,7 @@ from __future__ import annotations
 from copy import deepcopy
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
 
 
 class DvcTaskType(str):
@@ -32,7 +32,7 @@ class DvcTaskType(str):
     UPLOAD = "Upload"
 
 
-class BaseDVC(Task):
+class BaseDVC(BatchTask):
     """Base class for dvc task."""
 
     dvc_task_type = None
diff --git a/src/pydolphinscheduler/tasks/http.py 
b/src/pydolphinscheduler/tasks/http.py
index e64494b..000cc26 100644
--- a/src/pydolphinscheduler/tasks/http.py
+++ b/src/pydolphinscheduler/tasks/http.py
@@ -23,7 +23,7 @@ import warnings
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.parameter import Direction, ParameterHelper
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
 from pydolphinscheduler.exceptions import PyDSParamException
 
 
@@ -53,7 +53,7 @@ class HttpCheckCondition:
     BODY_NOT_CONTAINS = "BODY_NOT_CONTAINS"
 
 
-class Http(Task):
+class Http(BatchTask):
     """Task HTTP object, declare behavior for HTTP task to dolphinscheduler.
 
         :param name: The name or identifier for the HTTP task.
diff --git a/src/pydolphinscheduler/tasks/kubernetes.py 
b/src/pydolphinscheduler/tasks/kubernetes.py
index b902927..0152cb2 100644
--- a/src/pydolphinscheduler/tasks/kubernetes.py
+++ b/src/pydolphinscheduler/tasks/kubernetes.py
@@ -17,10 +17,10 @@
 
 """Task Kubernetes."""
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
 
 
-class Kubernetes(Task):
+class Kubernetes(BatchTask):
     """Task Kubernetes object, declare behavior for Kubernetes task to 
dolphinscheduler.
 
     :param name: task name
diff --git a/src/pydolphinscheduler/tasks/mlflow.py 
b/src/pydolphinscheduler/tasks/mlflow.py
index f61fc86..2d03709 100644
--- a/src/pydolphinscheduler/tasks/mlflow.py
+++ b/src/pydolphinscheduler/tasks/mlflow.py
@@ -21,7 +21,7 @@ from __future__ import annotations
 from copy import deepcopy
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
 
 
 class MLflowTaskType(str):
@@ -50,7 +50,7 @@ DEFAULT_MLFLOW_TRACKING_URI = "http://127.0.0.1:5000";
 DEFAULT_VERSION = "master"
 
 
-class BaseMLflow(Task):
+class BaseMLflow(BatchTask):
     """Base MLflow task."""
 
     mlflow_task_type = None
diff --git a/src/pydolphinscheduler/tasks/openmldb.py 
b/src/pydolphinscheduler/tasks/openmldb.py
index 5dad36e..bb22993 100644
--- a/src/pydolphinscheduler/tasks/openmldb.py
+++ b/src/pydolphinscheduler/tasks/openmldb.py
@@ -18,10 +18,10 @@
 """Task OpenMLDB."""
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
 
 
-class OpenMLDB(Task):
+class OpenMLDB(BatchTask):
     """Task OpenMLDB object, declare behavior for OpenMLDB task to 
dolphinscheduler.
 
     :param name: task name
diff --git a/src/pydolphinscheduler/tasks/procedure.py 
b/src/pydolphinscheduler/tasks/procedure.py
index 807196a..fd22cd3 100644
--- a/src/pydolphinscheduler/tasks/procedure.py
+++ b/src/pydolphinscheduler/tasks/procedure.py
@@ -20,11 +20,11 @@
 from __future__ import annotations
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
 from pydolphinscheduler.models.datasource import Datasource
 
 
-class Procedure(Task):
+class Procedure(BatchTask):
     """Task Procedure object, declare behavior for Procedure task to 
dolphinscheduler.
 
     It should run database procedure job in multiply sql lik engine, such as:
diff --git a/src/pydolphinscheduler/tasks/python.py 
b/src/pydolphinscheduler/tasks/python.py
index 8e2729f..80dced7 100644
--- a/src/pydolphinscheduler/tasks/python.py
+++ b/src/pydolphinscheduler/tasks/python.py
@@ -28,13 +28,13 @@ from stmdency.extractor import Extractor
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.mixin import WorkerResourceMixin
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
 from pydolphinscheduler.exceptions import PyDSParamException
 
 log = logging.getLogger(__file__)
 
 
-class Python(WorkerResourceMixin, Task):
+class Python(WorkerResourceMixin, BatchTask):
     """Task Python object, declare behavior for Python task to 
dolphinscheduler.
 
     Python task support two types of parameters for :param:``definition``, and 
here is an example:
diff --git a/src/pydolphinscheduler/tasks/pytorch.py 
b/src/pydolphinscheduler/tasks/pytorch.py
index 0ba0d7d..db0d9be 100644
--- a/src/pydolphinscheduler/tasks/pytorch.py
+++ b/src/pydolphinscheduler/tasks/pytorch.py
@@ -20,7 +20,7 @@ from __future__ import annotations
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.mixin import WorkerResourceMixin
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
 
 
 class DEFAULT:
@@ -31,7 +31,7 @@ class DEFAULT:
     python_command = "${PYTHON_HOME}"
 
 
-class Pytorch(WorkerResourceMixin, Task):
+class Pytorch(WorkerResourceMixin, BatchTask):
     """Task Pytorch object, declare behavior for Pytorch task to 
dolphinscheduler.
 
     See also: `DolphinScheduler Pytorch Task Plugin
diff --git a/src/pydolphinscheduler/tasks/sagemaker.py 
b/src/pydolphinscheduler/tasks/sagemaker.py
index 30b128d..01c8fac 100644
--- a/src/pydolphinscheduler/tasks/sagemaker.py
+++ b/src/pydolphinscheduler/tasks/sagemaker.py
@@ -18,10 +18,10 @@
 """Task SageMaker."""
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
 
 
-class SageMaker(Task):
+class SageMaker(BatchTask):
     """Task SageMaker object, declare behavior for SageMaker task to 
dolphinscheduler.
 
     :param name: A unique, meaningful string for the SageMaker task.
diff --git a/src/pydolphinscheduler/tasks/shell.py 
b/src/pydolphinscheduler/tasks/shell.py
index f6795ca..c93980b 100644
--- a/src/pydolphinscheduler/tasks/shell.py
+++ b/src/pydolphinscheduler/tasks/shell.py
@@ -19,10 +19,10 @@
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.mixin import WorkerResourceMixin
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
 
 
-class Shell(WorkerResourceMixin, Task):
+class Shell(WorkerResourceMixin, BatchTask):
     """Task shell object, declare behavior for shell task to dolphinscheduler.
 
     :param name: A unique, meaningful string for the shell task.
diff --git a/src/pydolphinscheduler/tasks/sql.py 
b/src/pydolphinscheduler/tasks/sql.py
index 520c291..c9c3f49 100644
--- a/src/pydolphinscheduler/tasks/sql.py
+++ b/src/pydolphinscheduler/tasks/sql.py
@@ -23,7 +23,7 @@ import re
 from collections.abc import Sequence
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
 from pydolphinscheduler.models.datasource import Datasource
 
 log = logging.getLogger(__file__)
@@ -36,7 +36,7 @@ class SqlType:
     NOT_SELECT = "1"
 
 
-class Sql(Task):
+class Sql(BatchTask):
     """Task SQL object, declare behavior for SQL task to dolphinscheduler.
 
     It should run sql job in multiply sql lik engine, such as:
diff --git a/src/pydolphinscheduler/tasks/sub_workflow.py 
b/src/pydolphinscheduler/tasks/sub_workflow.py
index 715a21e..4671146 100644
--- a/src/pydolphinscheduler/tasks/sub_workflow.py
+++ b/src/pydolphinscheduler/tasks/sub_workflow.py
@@ -20,12 +20,12 @@
 from __future__ import annotations
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
 from pydolphinscheduler.exceptions import PyDSWorkflowNotAssignException
 from pydolphinscheduler.java_gateway import gateway
 
 
-class SubWorkflow(Task):
+class SubWorkflow(BatchTask):
     """Task SubWorkflow object, declare behavior for SubWorkflow task to 
dolphinscheduler."""
 
     _task_custom_attr = {"process_definition_code"}
diff --git a/src/pydolphinscheduler/tasks/switch.py 
b/src/pydolphinscheduler/tasks/switch.py
index b0969af..625ac70 100644
--- a/src/pydolphinscheduler/tasks/switch.py
+++ b/src/pydolphinscheduler/tasks/switch.py
@@ -19,7 +19,7 @@
 from __future__ import annotations
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask, Task
 from pydolphinscheduler.exceptions import PyDSParamException
 from pydolphinscheduler.models.base import Base
 
@@ -126,7 +126,7 @@ class SwitchCondition(Base):
         return super().get_define()
 
 
-class Switch(Task):
+class Switch(BatchTask):
     """Task switch object, declare behavior for switch task to 
dolphinscheduler.
 
     Param of workflow or at least one local param of task must be set
diff --git a/tests/core/test_engine.py b/tests/core/test_engine.py
index 9c44308..312f27a 100644
--- a/tests/core/test_engine.py
+++ b/tests/core/test_engine.py
@@ -133,6 +133,7 @@ def test_property_task_params(mock_resource, 
mock_code_version, attr, expect):
                 "timeoutFlag": "CLOSE",
                 "timeoutNotifyStrategy": None,
                 "timeout": 0,
+                "taskExecuteType": "BATCH",
             },
         )
     ],
diff --git a/tests/testing/constants.py b/tests/testing/constants.py
index 07a3249..3b106c1 100644
--- a/tests/testing/constants.py
+++ b/tests/testing/constants.py
@@ -23,7 +23,6 @@ import os
 # but most of them just without adding by mistake, and we should add it later.
 task_without_example = {
     "http",
-    "sub_workflow",
     "procedure",
 }
 

Reply via email to