This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
The following commit(s) were added to refs/heads/main by this push:
new b568f05 fix: missing task_execute_type for task (#140)
b568f05 is described below
commit b568f05ad9c25a7d8b13db1d0de387160504630c
Author: Jay Chung <[email protected]>
AuthorDate: Thu Feb 8 15:37:22 2024 +0800
fix: missing task_execute_type for task (#140)
fix: #138
---
docs/source/tasks/index.rst | 3 +-
docs/source/tasks/sub_workflow.rst | 38 +++++
src/pydolphinscheduler/core/engine.py | 4 +-
src/pydolphinscheduler/core/task.py | 178 +++++++++++++++++++++
.../examples/task_sub_workflow_example.py | 58 +++++++
src/pydolphinscheduler/tasks/condition.py | 4 +-
src/pydolphinscheduler/tasks/datax.py | 6 +-
src/pydolphinscheduler/tasks/dependent.py | 4 +-
src/pydolphinscheduler/tasks/dvc.py | 4 +-
src/pydolphinscheduler/tasks/http.py | 4 +-
src/pydolphinscheduler/tasks/kubernetes.py | 4 +-
src/pydolphinscheduler/tasks/mlflow.py | 4 +-
src/pydolphinscheduler/tasks/openmldb.py | 4 +-
src/pydolphinscheduler/tasks/procedure.py | 4 +-
src/pydolphinscheduler/tasks/python.py | 4 +-
src/pydolphinscheduler/tasks/pytorch.py | 4 +-
src/pydolphinscheduler/tasks/sagemaker.py | 4 +-
src/pydolphinscheduler/tasks/shell.py | 4 +-
src/pydolphinscheduler/tasks/sql.py | 4 +-
src/pydolphinscheduler/tasks/sub_workflow.py | 4 +-
src/pydolphinscheduler/tasks/switch.py | 4 +-
tests/core/test_engine.py | 1 +
tests/testing/constants.py | 1 -
23 files changed, 311 insertions(+), 38 deletions(-)
diff --git a/docs/source/tasks/index.rst b/docs/source/tasks/index.rst
index fa2e73a..153d7ab 100644
--- a/docs/source/tasks/index.rst
+++ b/docs/source/tasks/index.rst
@@ -28,6 +28,7 @@ In this section
sql
python
http
+ sub_workflow
switch
condition
@@ -40,8 +41,6 @@ In this section
kubernetes
datax
- sub_workflow
-
sagemaker
mlflow
openmldb
diff --git a/docs/source/tasks/sub_workflow.rst
b/docs/source/tasks/sub_workflow.rst
index 026131a..eca0168 100644
--- a/docs/source/tasks/sub_workflow.rst
+++ b/docs/source/tasks/sub_workflow.rst
@@ -18,6 +18,44 @@
Sub Workflow
============
+Task trigger exists workflow run, should make sure workflow exists in current
project when you create
+sub workflow task.
+
+Example
+-------
+
+we have a simple example about how to use sub workflow task, when we want to
create a sub workflow task,
+we should makeh sure in already exists in current project. So the first thing
we do is to create a workflow
+will be used as sub workflow task.
+
+.. literalinclude::
../../../src/pydolphinscheduler/examples/task_sub_workflow_example.py
+ :start-after: [start sub_workflow_declare]
+ :end-before: [end sub_workflow_declare]
+
+workflow with name ``sub_workflow_upstream`` would be create after we exists
``submit`` method.
+
+Then we create a main workflow, and the sub workflow task will connect to
workflow we created before.
+
+.. literalinclude::
../../../src/pydolphinscheduler/examples/task_sub_workflow_example.py
+ :start-after: [start sub_workflow_task_declare]
+ :end-before: [end sub_workflow_task_declare]
+
+Finish we can submit or run sub workflow task by ``submit`` or ``run`` method.
And you can also use workflow
+already exists in current project instead of create a new one.
+
+.. note::
+
+ We could only run the workflow contains sub workflow task, and the sub
workflow task will trigger the
+ sub workflow run.
+
+
+.. literalinclude::
../../../src/pydolphinscheduler/examples/task_sub_workflow_example.py
+ :start-after: [start workflow_declare]
+ :end-before: [end workflow_declare]
+
+Dive Into
+---------
+
.. automodule:: pydolphinscheduler.tasks.sub_workflow
diff --git a/src/pydolphinscheduler/core/engine.py
b/src/pydolphinscheduler/core/engine.py
index 3e36820..b5bf0c8 100644
--- a/src/pydolphinscheduler/core/engine.py
+++ b/src/pydolphinscheduler/core/engine.py
@@ -21,7 +21,7 @@ from __future__ import annotations
from py4j.protocol import Py4JJavaError
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.java_gateway import gateway
@@ -34,7 +34,7 @@ class ProgramType(str):
PYTHON = "PYTHON"
-class Engine(Task):
+class Engine(BatchTask):
"""Task engine object, declare behavior for engine task to
dolphinscheduler.
This is the parent class of spark, flink and mr tasks,
diff --git a/src/pydolphinscheduler/core/task.py
b/src/pydolphinscheduler/core/task.py
index ac48ab8..a8e5fac 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -511,3 +511,181 @@ class Task(Base):
"""
self._output_params[name] = value
+
+
+class BatchTask(Task):
+ """Task object, parent class for all exactly task type.
+
+ :param name: The name of the task. Node names within the same workflow
must be unique.
+ :param task_type:
+ :param description: default None
+ :param flag: default TaskFlag.YES,
+ :param task_priority: default TaskPriority.MEDIUM
+ :param worker_group: default configuration.WORKFLOW_WORKER_GROUP
+ :param environment_name: default None
+ :param task_group_id: Identify of task group to restrict the parallelism
of tasks instance run, default 0.
+ :param task_group_priority: Priority for same task group to, the higher
the value, the higher the
+ priority, default 0.
+ :param delay_time: deault 0
+ :param fail_retry_times: default 0
+ :param fail_retry_interval: default 1
+ :param timeout_notify_strategy: default, None
+ :param timeout: Timeout attribute for task, in minutes. Task is consider
as timed out task when the
+ running time of a task exceeds than this value. when data type is
:class:`datetime.timedelta` will
+ be converted to int(in minutes). default ``None``
+ :param resource_list: default None
+ :param wait_start_timeout: default None
+ :param condition_result: default None,
+ :param resource_plugin: default None
+ :param is_cache: default False
+ :param input_params: default None, input parameters, {param_name:
param_value}
+ :param output_params: default None, input parameters, {param_name:
param_value}
+ """
+
+ _DEFINE_ATTR = Task._DEFINE_ATTR | {"task_execute_type"}
+
+ def __init__(
+ self,
+ name: str,
+ task_type: str,
+ description: str | None = None,
+ flag: str | None = TaskFlag.YES,
+ task_priority: str | None = TaskPriority.MEDIUM,
+ worker_group: str | None = configuration.WORKFLOW_WORKER_GROUP,
+ environment_name: str | None = None,
+ task_group_id: int | None = 0,
+ task_group_priority: int | None = 0,
+ delay_time: int | None = 0,
+ fail_retry_times: int | None = 0,
+ fail_retry_interval: int | None = 1,
+ timeout_notify_strategy: str | None = None,
+ timeout: timedelta | int | None = None,
+ workflow: Workflow | None = None,
+ resource_list: list | None = None,
+ dependence: dict | None = None,
+ wait_start_timeout: dict | None = None,
+ condition_result: dict | None = None,
+ resource_plugin: ResourcePlugin | None = None,
+ is_cache: bool | None = False,
+ input_params: dict | None = None,
+ output_params: dict | None = None,
+ *args,
+ **kwargs,
+ ):
+ super().__init__(
+ name,
+ task_type,
+ description,
+ flag,
+ task_priority,
+ worker_group,
+ environment_name,
+ task_group_id,
+ task_group_priority,
+ delay_time,
+ fail_retry_times,
+ fail_retry_interval,
+ timeout_notify_strategy,
+ timeout,
+ workflow,
+ resource_list,
+ dependence,
+ wait_start_timeout,
+ condition_result,
+ resource_plugin,
+ is_cache,
+ input_params,
+ output_params,
+ *args,
+ **kwargs,
+ )
+ self.task_execute_type = "BATCH"
+
+
+class StreamTask(Task):
+ """Task object, parent class for all exactly task type.
+
+ :param name: The name of the task. Node names within the same workflow
must be unique.
+ :param task_type:
+ :param description: default None
+ :param flag: default TaskFlag.YES,
+ :param task_priority: default TaskPriority.MEDIUM
+ :param worker_group: default configuration.WORKFLOW_WORKER_GROUP
+ :param environment_name: default None
+ :param task_group_id: Identify of task group to restrict the parallelism
of tasks instance run, default 0.
+ :param task_group_priority: Priority for same task group to, the higher
the value, the higher the
+ priority, default 0.
+ :param delay_time: deault 0
+ :param fail_retry_times: default 0
+ :param fail_retry_interval: default 1
+ :param timeout_notify_strategy: default, None
+ :param timeout: Timeout attribute for task, in minutes. Task is consider
as timed out task when the
+ running time of a task exceeds than this value. when data type is
:class:`datetime.timedelta` will
+ be converted to int(in minutes). default ``None``
+ :param resource_list: default None
+ :param wait_start_timeout: default None
+ :param condition_result: default None,
+ :param resource_plugin: default None
+ :param is_cache: default False
+ :param input_params: default None, input parameters, {param_name:
param_value}
+ :param output_params: default None, input parameters, {param_name:
param_value}
+ """
+
+ _DEFINE_ATTR = Task._DEFINE_ATTR | {"task_execute_type"}
+
+ def __init__(
+ self,
+ name: str,
+ task_type: str,
+ description: str | None = None,
+ flag: str | None = TaskFlag.YES,
+ task_priority: str | None = TaskPriority.MEDIUM,
+ worker_group: str | None = configuration.WORKFLOW_WORKER_GROUP,
+ environment_name: str | None = None,
+ task_group_id: int | None = 0,
+ task_group_priority: int | None = 0,
+ delay_time: int | None = 0,
+ fail_retry_times: int | None = 0,
+ fail_retry_interval: int | None = 1,
+ timeout_notify_strategy: str | None = None,
+ timeout: timedelta | int | None = None,
+ workflow: Workflow | None = None,
+ resource_list: list | None = None,
+ dependence: dict | None = None,
+ wait_start_timeout: dict | None = None,
+ condition_result: dict | None = None,
+ resource_plugin: ResourcePlugin | None = None,
+ is_cache: bool | None = False,
+ input_params: dict | None = None,
+ output_params: dict | None = None,
+ *args,
+ **kwargs,
+ ):
+ super().__init__(
+ name,
+ task_type,
+ description,
+ flag,
+ task_priority,
+ worker_group,
+ environment_name,
+ task_group_id,
+ task_group_priority,
+ delay_time,
+ fail_retry_times,
+ fail_retry_interval,
+ timeout_notify_strategy,
+ timeout,
+ workflow,
+ resource_list,
+ dependence,
+ wait_start_timeout,
+ condition_result,
+ resource_plugin,
+ is_cache,
+ input_params,
+ output_params,
+ *args,
+ **kwargs,
+ )
+ self.task_execute_type = "STREAM"
diff --git a/src/pydolphinscheduler/examples/task_sub_workflow_example.py
b/src/pydolphinscheduler/examples/task_sub_workflow_example.py
new file mode 100644
index 0000000..df7b6ce
--- /dev/null
+++ b/src/pydolphinscheduler/examples/task_sub_workflow_example.py
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""A example workflow for task sub workflow."""
+
+# [start tutorial]
+# [start package_import]
+# Import Workflow object to define your workflow attributes
+from pydolphinscheduler.core.workflow import Workflow
+
+# Import task Shell object cause we would create some shell tasks later
+from pydolphinscheduler.tasks.sub_workflow import SubWorkflow
+from pydolphinscheduler.tasks.shell import Shell
+
+
+# [start workflow_declare]
+# [start sub_workflow_declare]
+with Workflow(name="sub_workflow_downstream") as wf_downstream, Workflow(
+ name="task_sub_workflow_example"
+) as wf_upstream:
+ sub_workflow_ds_task = Shell(
+ name="task_sub_workflow",
+ command="echo 'call sub workflow success!'",
+ workflow=wf_downstream,
+ )
+ wf_downstream.submit()
+ # [end sub_workflow_declare]
+
+ sub_workflow_pre = Shell(
+ name="pre-task",
+ command="echo 'prefix task for sub workflow'",
+ workflow=wf_upstream,
+ )
+ # [start sub_workflow_task_declare]
+ sw_task = SubWorkflow(
+ name="sub_workflow",
+ workflow_name=wf_downstream.name,
+ workflow=wf_upstream,
+ )
+ # [end sub_workflow_task_declare]
+ sub_workflow_pre >> sw_task
+ # Please make sure workflow with name `wf_downstream.name` exists when we
submit or run sub workflow task
+ wf_upstream.run()
+# [end workflow_declare]
diff --git a/src/pydolphinscheduler/tasks/condition.py
b/src/pydolphinscheduler/tasks/condition.py
index 3d8f6f8..9359b5d 100644
--- a/src/pydolphinscheduler/tasks/condition.py
+++ b/src/pydolphinscheduler/tasks/condition.py
@@ -20,7 +20,7 @@
from __future__ import annotations
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask, Task
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.models.base import Base
@@ -154,7 +154,7 @@ class Or(ConditionOperator):
super().__init__(*args)
-class Condition(Task):
+class Condition(BatchTask):
"""Task condition object, declare behavior for condition task to
dolphinscheduler."""
def __init__(
diff --git a/src/pydolphinscheduler/tasks/datax.py
b/src/pydolphinscheduler/tasks/datax.py
index 59edfee..c6734ce 100644
--- a/src/pydolphinscheduler/tasks/datax.py
+++ b/src/pydolphinscheduler/tasks/datax.py
@@ -21,11 +21,11 @@ from __future__ import annotations
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.mixin import WorkerResourceMixin
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
from pydolphinscheduler.models.datasource import Datasource
-class CustomDataX(WorkerResourceMixin, Task):
+class CustomDataX(WorkerResourceMixin, BatchTask):
"""Task CustomDatax object, declare behavior for custom DataX task to
dolphinscheduler.
You provider json template for DataX, it can synchronize data according to
the template you provided.
@@ -87,7 +87,7 @@ class CustomDataX(WorkerResourceMixin, Task):
self.add_attr(**kwargs)
-class DataX(WorkerResourceMixin, Task):
+class DataX(WorkerResourceMixin, BatchTask):
"""Task DataX object, declare behavior for DataX task to dolphinscheduler.
It should run database datax job in multiply sql link engine, such as:
diff --git a/src/pydolphinscheduler/tasks/dependent.py
b/src/pydolphinscheduler/tasks/dependent.py
index 6c5ac2b..be716f6 100644
--- a/src/pydolphinscheduler/tasks/dependent.py
+++ b/src/pydolphinscheduler/tasks/dependent.py
@@ -21,7 +21,7 @@ from __future__ import annotations
import warnings
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
from pydolphinscheduler.exceptions import PyDSJavaGatewayException,
PyDSParamException
from pydolphinscheduler.java_gateway import gateway
from pydolphinscheduler.models.base import Base
@@ -272,7 +272,7 @@ class Or(DependentOperator):
super().__init__(*args)
-class Dependent(Task):
+class Dependent(BatchTask):
"""Task dependent object, declare behavior for dependent task to
dolphinscheduler."""
def __init__(self, name: str, dependence: DependentOperator, *args,
**kwargs):
diff --git a/src/pydolphinscheduler/tasks/dvc.py
b/src/pydolphinscheduler/tasks/dvc.py
index 5cbc6f0..46b2125 100644
--- a/src/pydolphinscheduler/tasks/dvc.py
+++ b/src/pydolphinscheduler/tasks/dvc.py
@@ -21,7 +21,7 @@ from __future__ import annotations
from copy import deepcopy
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
class DvcTaskType(str):
@@ -32,7 +32,7 @@ class DvcTaskType(str):
UPLOAD = "Upload"
-class BaseDVC(Task):
+class BaseDVC(BatchTask):
"""Base class for dvc task."""
dvc_task_type = None
diff --git a/src/pydolphinscheduler/tasks/http.py
b/src/pydolphinscheduler/tasks/http.py
index e64494b..000cc26 100644
--- a/src/pydolphinscheduler/tasks/http.py
+++ b/src/pydolphinscheduler/tasks/http.py
@@ -23,7 +23,7 @@ import warnings
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.parameter import Direction, ParameterHelper
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
from pydolphinscheduler.exceptions import PyDSParamException
@@ -53,7 +53,7 @@ class HttpCheckCondition:
BODY_NOT_CONTAINS = "BODY_NOT_CONTAINS"
-class Http(Task):
+class Http(BatchTask):
"""Task HTTP object, declare behavior for HTTP task to dolphinscheduler.
:param name: The name or identifier for the HTTP task.
diff --git a/src/pydolphinscheduler/tasks/kubernetes.py
b/src/pydolphinscheduler/tasks/kubernetes.py
index b902927..0152cb2 100644
--- a/src/pydolphinscheduler/tasks/kubernetes.py
+++ b/src/pydolphinscheduler/tasks/kubernetes.py
@@ -17,10 +17,10 @@
"""Task Kubernetes."""
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
-class Kubernetes(Task):
+class Kubernetes(BatchTask):
"""Task Kubernetes object, declare behavior for Kubernetes task to
dolphinscheduler.
:param name: task name
diff --git a/src/pydolphinscheduler/tasks/mlflow.py
b/src/pydolphinscheduler/tasks/mlflow.py
index f61fc86..2d03709 100644
--- a/src/pydolphinscheduler/tasks/mlflow.py
+++ b/src/pydolphinscheduler/tasks/mlflow.py
@@ -21,7 +21,7 @@ from __future__ import annotations
from copy import deepcopy
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
class MLflowTaskType(str):
@@ -50,7 +50,7 @@ DEFAULT_MLFLOW_TRACKING_URI = "http://127.0.0.1:5000"
DEFAULT_VERSION = "master"
-class BaseMLflow(Task):
+class BaseMLflow(BatchTask):
"""Base MLflow task."""
mlflow_task_type = None
diff --git a/src/pydolphinscheduler/tasks/openmldb.py
b/src/pydolphinscheduler/tasks/openmldb.py
index 5dad36e..bb22993 100644
--- a/src/pydolphinscheduler/tasks/openmldb.py
+++ b/src/pydolphinscheduler/tasks/openmldb.py
@@ -18,10 +18,10 @@
"""Task OpenMLDB."""
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
-class OpenMLDB(Task):
+class OpenMLDB(BatchTask):
"""Task OpenMLDB object, declare behavior for OpenMLDB task to
dolphinscheduler.
:param name: task name
diff --git a/src/pydolphinscheduler/tasks/procedure.py
b/src/pydolphinscheduler/tasks/procedure.py
index 807196a..fd22cd3 100644
--- a/src/pydolphinscheduler/tasks/procedure.py
+++ b/src/pydolphinscheduler/tasks/procedure.py
@@ -20,11 +20,11 @@
from __future__ import annotations
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
from pydolphinscheduler.models.datasource import Datasource
-class Procedure(Task):
+class Procedure(BatchTask):
"""Task Procedure object, declare behavior for Procedure task to
dolphinscheduler.
It should run database procedure job in multiply sql lik engine, such as:
diff --git a/src/pydolphinscheduler/tasks/python.py
b/src/pydolphinscheduler/tasks/python.py
index 8e2729f..80dced7 100644
--- a/src/pydolphinscheduler/tasks/python.py
+++ b/src/pydolphinscheduler/tasks/python.py
@@ -28,13 +28,13 @@ from stmdency.extractor import Extractor
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.mixin import WorkerResourceMixin
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
from pydolphinscheduler.exceptions import PyDSParamException
log = logging.getLogger(__file__)
-class Python(WorkerResourceMixin, Task):
+class Python(WorkerResourceMixin, BatchTask):
"""Task Python object, declare behavior for Python task to
dolphinscheduler.
Python task support two types of parameters for :param:``definition``, and
here is an example:
diff --git a/src/pydolphinscheduler/tasks/pytorch.py
b/src/pydolphinscheduler/tasks/pytorch.py
index 0ba0d7d..db0d9be 100644
--- a/src/pydolphinscheduler/tasks/pytorch.py
+++ b/src/pydolphinscheduler/tasks/pytorch.py
@@ -20,7 +20,7 @@ from __future__ import annotations
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.mixin import WorkerResourceMixin
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
class DEFAULT:
@@ -31,7 +31,7 @@ class DEFAULT:
python_command = "${PYTHON_HOME}"
-class Pytorch(WorkerResourceMixin, Task):
+class Pytorch(WorkerResourceMixin, BatchTask):
"""Task Pytorch object, declare behavior for Pytorch task to
dolphinscheduler.
See also: `DolphinScheduler Pytorch Task Plugin
diff --git a/src/pydolphinscheduler/tasks/sagemaker.py
b/src/pydolphinscheduler/tasks/sagemaker.py
index 30b128d..01c8fac 100644
--- a/src/pydolphinscheduler/tasks/sagemaker.py
+++ b/src/pydolphinscheduler/tasks/sagemaker.py
@@ -18,10 +18,10 @@
"""Task SageMaker."""
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
-class SageMaker(Task):
+class SageMaker(BatchTask):
"""Task SageMaker object, declare behavior for SageMaker task to
dolphinscheduler.
:param name: A unique, meaningful string for the SageMaker task.
diff --git a/src/pydolphinscheduler/tasks/shell.py
b/src/pydolphinscheduler/tasks/shell.py
index f6795ca..c93980b 100644
--- a/src/pydolphinscheduler/tasks/shell.py
+++ b/src/pydolphinscheduler/tasks/shell.py
@@ -19,10 +19,10 @@
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.mixin import WorkerResourceMixin
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
-class Shell(WorkerResourceMixin, Task):
+class Shell(WorkerResourceMixin, BatchTask):
"""Task shell object, declare behavior for shell task to dolphinscheduler.
:param name: A unique, meaningful string for the shell task.
diff --git a/src/pydolphinscheduler/tasks/sql.py
b/src/pydolphinscheduler/tasks/sql.py
index 520c291..c9c3f49 100644
--- a/src/pydolphinscheduler/tasks/sql.py
+++ b/src/pydolphinscheduler/tasks/sql.py
@@ -23,7 +23,7 @@ import re
from collections.abc import Sequence
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
from pydolphinscheduler.models.datasource import Datasource
log = logging.getLogger(__file__)
@@ -36,7 +36,7 @@ class SqlType:
NOT_SELECT = "1"
-class Sql(Task):
+class Sql(BatchTask):
"""Task SQL object, declare behavior for SQL task to dolphinscheduler.
It should run sql job in multiply sql lik engine, such as:
diff --git a/src/pydolphinscheduler/tasks/sub_workflow.py
b/src/pydolphinscheduler/tasks/sub_workflow.py
index 715a21e..4671146 100644
--- a/src/pydolphinscheduler/tasks/sub_workflow.py
+++ b/src/pydolphinscheduler/tasks/sub_workflow.py
@@ -20,12 +20,12 @@
from __future__ import annotations
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask
from pydolphinscheduler.exceptions import PyDSWorkflowNotAssignException
from pydolphinscheduler.java_gateway import gateway
-class SubWorkflow(Task):
+class SubWorkflow(BatchTask):
"""Task SubWorkflow object, declare behavior for SubWorkflow task to
dolphinscheduler."""
_task_custom_attr = {"process_definition_code"}
diff --git a/src/pydolphinscheduler/tasks/switch.py
b/src/pydolphinscheduler/tasks/switch.py
index b0969af..625ac70 100644
--- a/src/pydolphinscheduler/tasks/switch.py
+++ b/src/pydolphinscheduler/tasks/switch.py
@@ -19,7 +19,7 @@
from __future__ import annotations
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.task import BatchTask, Task
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.models.base import Base
@@ -126,7 +126,7 @@ class SwitchCondition(Base):
return super().get_define()
-class Switch(Task):
+class Switch(BatchTask):
"""Task switch object, declare behavior for switch task to
dolphinscheduler.
Param of workflow or at least one local param of task must be set
diff --git a/tests/core/test_engine.py b/tests/core/test_engine.py
index 9c44308..312f27a 100644
--- a/tests/core/test_engine.py
+++ b/tests/core/test_engine.py
@@ -133,6 +133,7 @@ def test_property_task_params(mock_resource,
mock_code_version, attr, expect):
"timeoutFlag": "CLOSE",
"timeoutNotifyStrategy": None,
"timeout": 0,
+ "taskExecuteType": "BATCH",
},
)
],
diff --git a/tests/testing/constants.py b/tests/testing/constants.py
index 07a3249..3b106c1 100644
--- a/tests/testing/constants.py
+++ b/tests/testing/constants.py
@@ -23,7 +23,6 @@ import os
# but most of them just without adding by mistake, and we should add it later.
task_without_example = {
"http",
- "sub_workflow",
"procedure",
}