This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
The following commit(s) were added to refs/heads/main by this push:
new 48586a3 Add WorkerResourceMixin better way to set CPU quotas and max
memory (#110)
48586a3 is described below
commit 48586a3702df3f708044c8c03eedd442f21cf24f
Author: sofyc <[email protected]>
AuthorDate: Thu Oct 26 01:36:32 2023 -0700
Add WorkerResourceMixin better way to set CPU quotas and max memory (#110)
---
docs/source/concept.rst | 36 +++++------
docs/source/start.rst | 62 +++++++++---------
docs/source/tasks/datax.rst | 10 +++
docs/source/tasks/python.rst | 21 ++++++
docs/source/tasks/pytorch.rst | 11 ++++
docs/source/tasks/shell.rst | 9 +++
docs/source/tutorial.rst | 22 +++----
setup.cfg | 4 +-
src/pydolphinscheduler/core/mixin.py | 39 +++++++++++
src/pydolphinscheduler/core/parameter.py | 2 +-
src/pydolphinscheduler/core/task.py | 4 +-
.../examples/task_datax_example.py | 12 ++++
.../examples/task_python_example.py | 43 +++++++++++++
.../examples/task_pytorch_example.py | 12 ++++
src/pydolphinscheduler/examples/tutorial.py | 11 +++-
src/pydolphinscheduler/models/base.py | 2 +-
src/pydolphinscheduler/tasks/datax.py | 7 +-
src/pydolphinscheduler/tasks/python.py | 4 +-
src/pydolphinscheduler/tasks/pytorch.py | 4 +-
src/pydolphinscheduler/tasks/shell.py | 4 +-
tests/tasks/test_datax.py | 75 ++++++++++++++++++++++
tests/tasks/test_python.py | 30 +++++++++
tests/tasks/test_pytorch.py | 28 ++++++++
tests/tasks/test_shell.py | 30 +++++++++
tests/testing/constants.py | 1 -
25 files changed, 411 insertions(+), 72 deletions(-)
diff --git a/docs/source/concept.rst b/docs/source/concept.rst
index ab8df5e..4164d78 100644
--- a/docs/source/concept.rst
+++ b/docs/source/concept.rst
@@ -23,10 +23,10 @@ In this section, you would know the core concepts of
*PyDolphinScheduler*.
Workflow
--------
-Workflow describe the whole things except `tasks`_ and `tasks dependence`_,
which including
+Workflow describes the whole things except `tasks`_ and `tasks dependence`_,
which includes
name, schedule interval, schedule start time and end time. You would know
scheduler
-Workflow could be initialized in normal assign statement or in context manger.
+Workflow could be initialized in a normal assignment statement or within a
context manger.
.. code-block:: python
@@ -37,10 +37,10 @@ Workflow could be initialized in normal assign statement or
in context manger.
with Workflow(name="my first workflow") as workflow:
workflow.submit()
-Workflow is the main object communicate between *PyDolphinScheduler* and
DolphinScheduler daemon.
-After workflow and task is be declared, you could use `submit` and `run`
notify server your definition.
+Workflow is the main object communicating between *PyDolphinScheduler* and
DolphinScheduler daemon.
+After workflow and task is declared, you could use `submit` and `run` to
notify server your definition.
-If you just want to submit your definition and create workflow, without run
it, you should use attribute `submit`.
+If you just want to submit your definition and create workflow, without
running it, you should use attribute `submit`.
But if you want to run the workflow after you submit it, you could use
attribute `run`.
.. code-block:: python
@@ -54,8 +54,8 @@ But if you want to run the workflow after you submit it, you
could use attribute
Schedule
~~~~~~~~
-We use parameter `schedule` determine the schedule interval of workflow,
*PyDolphinScheduler* support seven
-asterisks expression, and each of the meaning of position as below
+We use parameter `schedule` to determine the schedule interval of workflow,
*PyDolphinScheduler* supports seven
+asterisks expression, and each of the meaning of position is as below
.. code-block:: text
@@ -136,8 +136,8 @@ Alert is the way to notify user when workflow instance is
success or failed. We
Tasks
-----
-Task is the minimum unit running actual job, and it is nodes of DAG, aka
directed acyclic graph. You could define
-what you want to in the task. It have some required parameter to make
uniqueness and definition.
+Task is the minimum unit running actual job, and it is a node of DAG, aka
directed acyclic graph. You could define
+what you want in the task. It has some required parameters to make uniqueness
and definition.
Here we use :py:meth:`pydolphinscheduler.tasks.Shell` as example, parameter
`name` and `command` is required and must be provider. Parameter
`name` set name to the task, and parameter `command` declare the command you
wish to run in this task.
@@ -147,15 +147,15 @@ Here we use :py:meth:`pydolphinscheduler.tasks.Shell` as
example, parameter `nam
# We named this task as "shell", and just run command `echo shell task`
shell_task = Shell(name="shell", command="echo shell task")
-If you want to see all type of tasks, you could see :doc:`tasks/index`.
+If you want to see all types of tasks, you could see :doc:`tasks/index`.
Tasks Dependence
~~~~~~~~~~~~~~~~
-You could define many tasks in on single `Workflow`_. If all those task is in
parallel processing,
-then you could leave them alone without adding any additional information. But
if there have some tasks should
-not be run unless pre task in workflow have be done, we should set task
dependence to them. Set tasks dependence
-have two mainly way and both of them is easy. You could use bitwise operator
`>>` and `<<`, or task attribute
+You could define many tasks in on single `Workflow`_. If all those tasks are
in parallel processing,
+then you could leave them alone without adding any additional information. But
if there are some tasks that should
+not be run unless pre task in workflow has been done, we should set task
dependence to them. Set task dependence
+have two main ways and both of them are easy. You could use bitwise operator
`>>` and `<<`, or task attribute
`set_downstream` and `set_upstream` to do it.
.. code-block:: python
@@ -178,7 +178,7 @@ have two mainly way and both of them is easy. You could use
bitwise operator `>>
Task With Workflow
~~~~~~~~~~~~~~~~~~
-In most of data orchestration cases, you should assigned attribute `workflow`
to task instance to
+In most of data orchestration cases, you should assign attribute `workflow` to
task instance to
decide workflow of task. You could set `workflow` in both normal assign or in
context manger mode
.. code-block:: python
@@ -232,13 +232,13 @@ Resource Files
--------------
During workflow running, we may need some resource files to help us run task
usually. One of a common situation
-is that we already have some executable files locally, and we need to schedule
in specific time, or add them
-to existing workflow by adding the new tasks. Of cause, we can upload those
files to target machine and run them
+is that we already have some executable files locally, and we need to schedule
a specific time, or add them
+to existing workflow by adding the new tasks. Of course, we can upload those
files to target machine and run them
in :doc:`shell task <tasks/shell>` by reference the absolute path of file. But
if we have more than one machine
to run task, we have to upload those files to each of them. And it is not
convenient and not flexible, because
we may need to change our resource files sometimes.
-The more pydolphinscheduler way is to upload those files together with
`workflow`_, and use them in task to run.
+One more pydolphinscheduler way is to upload those files together with
`workflow`_, and use them in task to run.
For example, you have a bash script named ``echo-ten.sh`` locally, and it
contains some code like this:
.. code-block:: bash
diff --git a/docs/source/start.rst b/docs/source/start.rst
index 434d80e..3c41471 100644
--- a/docs/source/start.rst
+++ b/docs/source/start.rst
@@ -18,7 +18,7 @@
Getting Started
===============
-To get started with *PyDolphinScheduler* you must ensure python and pip
+To get started with *PyDolphinScheduler* you must ensure python and pip are
installed on your machine, if you're already set up, you can skip straight
to `Installing PyDolphinScheduler`_, otherwise please continue with
`Installing Python`_.
@@ -28,16 +28,16 @@ Installing Python
How to install `python` and `pip` depends on what operating system
you're using. The python wiki provides up to date
-`instructions for all platforms here`_. When you entering the website
-and choice your operating system, you would be offered the choice and
-select python version. *PyDolphinScheduler* recommend use version above
-Python 3.6 and we highly recommend you install *Stable Releases* instead
+`instructions for all platforms here`_. When you enter the website
+and choose your operating system, you would be offered the choice and
+select python version. *PyDolphinScheduler* recommends using a version above
+Python 3.6 and we highly recommend installing *Stable Releases* instead
of *Pre-releases*.
After you have download and installed Python, you should open your terminal,
-typing and running :code:`python --version` to check whether the installation
-is correct or not. If all thing good, you could see the version in console
-without error(here is a example after Python 3.8.7 installed)
+type and run :code:`python --version` to check whether the installation
+is correct or not. If everything is good, you could see the version in console
+without error(here is an example after Python 3.8.7 is installed)
.. code-block:: bash
@@ -49,21 +49,21 @@ Installing PyDolphinScheduler
-----------------------------
After Python is already installed on your machine following section
-`installing Python`_, it easy to *PyDolphinScheduler* by pip.
+`installing Python`_, it is easy to install *PyDolphinScheduler* using pip.
.. code-block:: bash
python -m pip install apache-dolphinscheduler
-The latest version of *PyDolphinScheduler* would be installed after you run
above
+The latest version of *PyDolphinScheduler* would be installed after you run
the above
command in your terminal. You could go and `start Python Gateway Service`_ to
finish
-the prepare, and then go to :doc:`tutorial` to make your hand dirty. But if you
+the preparation, and then go to :doc:`tutorial` to get your hand dirty. But if
you
want to install the unreleased version of *PyDolphinScheduler*, you could go
and see
-section `installing PyDolphinScheduler in dev branch`_ for more detail.
+section `installing PyDolphinScheduler in dev branch`_ for more details.
.. note::
- Currently, we released multiple pre-release package in PyPI, you can see
all released package
+ Currently, we have released multiple pre-release packages in PyPI, you can
see all released packages
including pre-release in `release history
<https://pypi.org/project/apache-dolphinscheduler/#history>`_.
You can fix the the package version if you want to install pre-release
package, for example if
you want to install version `3.0.0-beta-2` package, you can run command
@@ -72,9 +72,9 @@ section `installing PyDolphinScheduler in dev branch`_ for
more detail.
Installing PyDolphinScheduler In DEV Branch
-------------------------------------------
-Because the project is developing and some of the features still not release.
-If you want to try some thing unreleased you could install from the source code
-which we hold in GitHub
+Because the project is developing and some of the features are still not
released.
+If you want to try something unreleased you could install from the source code
+which we hold on GitHub
.. code-block:: bash
@@ -84,10 +84,10 @@ which we hold in GitHub
python -m pip install -e .
After you installed *PyDolphinScheduler*, please remember `start Python
Gateway Service`_
-which waiting for *PyDolphinScheduler*'s workflow definition require.
+which is required for *PyDolphinScheduler*'s workflow definition.
-Above command will clone whole dolphinscheduler source code to local, maybe
you want to install latest pydolphinscheduler
-package directly and do not care about other code(including Python gateway
service code), you can execute command
+Above command will clone whole dolphinscheduler source code to local, maybe
you want to install the latest pydolphinscheduler
+package directly and do not care about other code(including Python gateway
service code), you can execute the command
.. code-block:: bash
@@ -98,10 +98,10 @@ Start Python Gateway Service
----------------------------
Since **PyDolphinScheduler** is Python API for `Apache DolphinScheduler`_, it
-could define workflow and tasks structure, but could not run it unless you
-`install Apache DolphinScheduler`_ and start its API server which including
-Python gateway service in it. We only and some key steps here and you could
-go `install Apache DolphinScheduler`_ for more detail
+could define workflow and task structures, but could not run it unless you
+`install Apache DolphinScheduler`_ and start its API server which includes
+Python gateway service in it. We only write some key steps here and you could
+go `install Apache DolphinScheduler`_ for more details
.. code-block:: bash
@@ -109,7 +109,7 @@ go `install Apache DolphinScheduler`_ for more detail
./bin/dolphinscheduler-daemon.sh start api-server
To check whether the server is alive or not, you could run :code:`jps`. And
-the server is health if keyword `ApiApplicationServer` in the console.
+the server is healthy if keyword `ApiApplicationServer` is in the console.
.. code-block:: bash
@@ -120,14 +120,14 @@ the server is health if keyword `ApiApplicationServer` in
the console.
.. note::
- Please make sure you already enabled started Python gateway service along
with `api-server`. The configuration is in
+ Please make sure you already started Python gateway service along with
`api-server`. The configuration is in
yaml config path `python-gateway.enabled : true` in api-server's
configuration path in `api-server/conf/application.yaml`.
- The default value is true and Python gateway service start when api server
is been started.
+ The default value is true and Python gateway service starts when api server
is started.
Run an Example
--------------
-Before run an example for pydolphinscheduler, you should get the example code
from it source code. You could run
+Before run an example for pydolphinscheduler, you should get the example code
from its source code. You could run
single bash command to get it
.. code-block:: bash
@@ -156,19 +156,19 @@ from the API server, you should first change
pydolphinscheduler configuration an
After that, you could go and see your DolphinScheduler web UI to find out a
new workflow created by pydolphinscheduler,
and the path of web UI is `Project -> Workflow -> Workflow Definition`, and
you can see a workflow and workflow instance
-had been created and DAG is auto formatter by web UI.
+had been created and DAG is automatically formatted by web UI.
.. note::
- We have default authentication token when in first launch dolphinscheduler
and pydolphinscheduler. Please change
+ We have default authentication token when you first launch dolphinscheduler
and pydolphinscheduler. Please change
the parameter ``auth_token`` when you deploy in production environment or
test dolphinscheduler in public network.
- See :ref:`authentication token <concept:authentication token>` for more
detail.
+ See :ref:`authentication token <concept:authentication token>` for more
details.
What's More
-----------
-If you do not familiar with *PyDolphinScheduler*, you could go to
:doc:`tutorial` and see how it works. But
+If you are not familiar with *PyDolphinScheduler*, you could go to
:doc:`tutorial` and see how it works. But
if you already know the basic usage or concept of *PyDolphinScheduler*, you
could go and play with all
:doc:`tasks/index` *PyDolphinScheduler* supports, or see our
:doc:`howto/index` about useful cases.
diff --git a/docs/source/tasks/datax.rst b/docs/source/tasks/datax.rst
index cb67a2f..f704e3f 100644
--- a/docs/source/tasks/datax.rst
+++ b/docs/source/tasks/datax.rst
@@ -27,6 +27,16 @@ Example
:start-after: [start workflow_declare]
:end-before: [end workflow_declare]
+Resource Limit Example
+----------------------
+
+We can add resource limit like CPU quota and max memory by passing parameters
when declaring tasks.
+
+.. literalinclude::
../../../src/pydolphinscheduler/examples/task_datax_example.py
+ :start-after: [start resource_limit]
+ :end-before: [end resource_limit]
+
+
Dive Into
---------
diff --git a/docs/source/tasks/python.rst b/docs/source/tasks/python.rst
index 1bf6210..40b0c6d 100644
--- a/docs/source/tasks/python.rst
+++ b/docs/source/tasks/python.rst
@@ -18,6 +18,27 @@
Python
======
+A Python task type's example and dive into information of
**PyDolphinScheduler**.
+
+Example
+-------
+
+.. literalinclude::
../../../src/pydolphinscheduler/examples/task_python_example.py
+ :start-after: [start workflow_declare]
+ :end-before: [end workflow_declare]
+
+Resource Limit Example
+----------------------
+
+We can add resource limit like CPU quota and max memory by passing parameters
when declaring tasks.
+
+.. literalinclude::
../../../src/pydolphinscheduler/examples/task_python_example.py
+ :start-after: [start resource_limit]
+ :end-before: [end resource_limit]
+
+Dive Into
+---------
+
.. automodule:: pydolphinscheduler.tasks.python
diff --git a/docs/source/tasks/pytorch.rst b/docs/source/tasks/pytorch.rst
index 4c7a552..7e3c034 100644
--- a/docs/source/tasks/pytorch.rst
+++ b/docs/source/tasks/pytorch.rst
@@ -28,6 +28,17 @@ Example
:start-after: [start workflow_declare]
:end-before: [end workflow_declare]
+
+Resource Limit Example
+----------------------
+
+We can add resource limit like CPU quota and max memory by passing parameters
when declaring tasks.
+
+.. literalinclude::
../../../src/pydolphinscheduler/examples/task_pytorch_example.py
+ :start-after: [start resource_limit]
+ :end-before: [end resource_limit]
+
+
Dive Into
---------
diff --git a/docs/source/tasks/shell.rst b/docs/source/tasks/shell.rst
index 2dd106a..13074fc 100644
--- a/docs/source/tasks/shell.rst
+++ b/docs/source/tasks/shell.rst
@@ -27,6 +27,15 @@ Example
:start-after: [start workflow_declare]
:end-before: [end task_relation_declare]
+Resource Limit Example
+----------------------
+
+We can add resource limit like CPU quota and max memory by passing parameters
when declaring tasks.
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/tutorial.py
+ :start-after: [start resource_limit]
+ :end-before: [end resource_limit]
+
Dive Into
---------
diff --git a/docs/source/tutorial.rst b/docs/source/tutorial.rst
index 695c945..259e3e7 100644
--- a/docs/source/tutorial.rst
+++ b/docs/source/tutorial.rst
@@ -20,21 +20,21 @@ Tutorial
This tutorial shows you the basic concept of *PyDolphinScheduler* and tells all
things you should know before you submit or run your first workflow. If you
-still have not installed *PyDolphinScheduler* and start DolphinScheduler, you
-could go and see :ref:`how to getting start PyDolphinScheduler <start:getting
started>` firstly.
+still have not installed *PyDolphinScheduler* and started DolphinScheduler, you
+could go and see :ref:`how to get started with PyDolphinScheduler
<start:getting started>` firstly.
Overview of Tutorial
--------------------
-Here have an overview of our tutorial, and it looks a little complex but does
not
-worry about that because we explain this example below as detail as possible.
+Here, we have an overview of our tutorial, and it looks a little complex but
don't
+worry about that because we will explain this example below in as much detail
as possible.
There are two types of tutorials: traditional and task decorator.
- **Traditional Way**: More general, support many :doc:`built-in task types
<tasks/index>`, it is convenient
when you build your workflow at the beginning.
-- **Task Decorator**: A Python decorator allow you to wrap your function into
pydolphinscheduler's task. Less
- versatility to the traditional way because it only supported Python
functions and without build-in tasks
+- **Task Decorator**: A Python decorator that allows you to wrap your function
into pydolphinscheduler's task. Less
+ versatility to the traditional way because it only supports Python functions
without build-in tasks
supported. But it is helpful if your workflow is all built with Python or if
you already have some Python
workflow code and want to migrate them to pydolphinscheduler.
- **YAML File**: We can use pydolphinscheduler CLI to create workflow using
YAML file: :code:`pydolphinscheduler yaml -f tutorial.yaml`.
@@ -94,7 +94,7 @@ We should instantiate
:class:`pydolphinscheduler.core.workflow.Workflow` object
import them from `import necessary module`_. Here we declare basic arguments
for workflow.
We define the name of :code:`Workflow`, using `Python context manager`_ and it
**the only required argument**
for `Workflow`. Besides, we also declare three arguments named
:code:`schedule` and :code:`start_time`
-which setting workflow schedule interval and schedule start_time, and argument
:code:`tenant` defines which tenant
+which sets workflow schedule interval and schedule start_time, and argument
:code:`tenant` defines which tenant
will be running this task in the DolphinScheduler worker. See :ref:`section
tenant <concept:tenant>` in
*PyDolphinScheduler* :doc:`concept` for more information.
@@ -119,7 +119,7 @@ will be running this task in the DolphinScheduler worker.
See :ref:`section tena
:end-before: # Define the tasks within the workflow
:language: yaml
-We could find more detail about :code:`Workflow` in :ref:`concept about
workflow <concept:workflow>`
+We could find more details about :code:`Workflow` in :ref:`concept about
workflow <concept:workflow>`
if you are interested in it. For all arguments of object workflow, you could
find in the
:class:`pydolphinscheduler.core.workflow` API documentation.
@@ -128,7 +128,7 @@ Task Declaration
.. tab:: Tradition
- We declare four tasks to show how to create tasks, and both of them are
simple tasks of
+ We declare four tasks to show how to create tasks, and all of them are
simple tasks of
:class:`pydolphinscheduler.tasks.shell` which runs `echo` command in the
terminal. Besides the argument
`command` with :code:`echo` command, we also need to set the argument
`name` for each task
*(not only shell task, `name` is required for each type of task)*.
@@ -142,7 +142,7 @@ Task Declaration
.. tab:: Task Decorator
- We declare four tasks to show how to create tasks, and both of them are
created by the task decorator which
+ We declare four tasks to show how to create tasks, and all of them are
created by the task decorator which
using :func:`pydolphinscheduler.tasks.func_wrap.task`. All we have to do is
add a decorator named
:code:`@task` to existing Python function, and then use them inside
:class:`pydolphinscheduler.core.workflow`
@@ -151,7 +151,7 @@ Task Declaration
:start-after: [start task_declare]
:end-before: [end task_declare]
- It makes our workflow more Pythonic, but be careful that when we use task
decorator mode mean we only use
+ It makes our workflow more Pythonic, but be careful that when we use task
decorator mode, it means we only use
Python function as a task and could not use the :doc:`built-in tasks
<tasks/index>` most of the cases.
.. tab:: YAML File
diff --git a/setup.cfg b/setup.cfg
index f8f28d7..cca6044 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -182,7 +182,9 @@ ignore =
D105,
# Conflict to Black
# W503: Line breaks before binary operators
- W503
+ W503,
+ # D400: First line should end with a period
+ D400
per-file-ignores =
*/pydolphinscheduler/side/__init__.py:F401
*/pydolphinscheduler/tasks/__init__.py:F401
diff --git a/src/pydolphinscheduler/core/mixin.py
b/src/pydolphinscheduler/core/mixin.py
new file mode 100644
index 0000000..1cf35b3
--- /dev/null
+++ b/src/pydolphinscheduler/core/mixin.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""WorkerResource Mixin"""
+
+
+class WorkerResourceMixin:
+ """Mixin object, declare some attributes for WorkerResource."""
+
+ def add_attr(self, **kwargs):
+ """Add attributes to WorkerResource, include cpu_quota and memory_max
now."""
+ self._cpu_quota = kwargs.get("cpu_quota", -1)
+ self._memory_max = kwargs.get("memory_max", -1)
+ if hasattr(self, "_DEFINE_ATTR"):
+ self._DEFINE_ATTR |= {"cpu_quota", "memory_max"}
+
+ @property
+ def cpu_quota(self):
+ """Get cpu_quota."""
+ return self._cpu_quota
+
+ @property
+ def memory_max(self):
+ """Get memory_max."""
+ return self._memory_max
diff --git a/src/pydolphinscheduler/core/parameter.py
b/src/pydolphinscheduler/core/parameter.py
index 6bac357..15ea723 100644
--- a/src/pydolphinscheduler/core/parameter.py
+++ b/src/pydolphinscheduler/core/parameter.py
@@ -49,7 +49,7 @@ class BaseDataType:
def __eq__(self, data):
return (
- type(self) == type(data)
+ type(self) is type(data)
and self.data_type == data.data_type
and self.value == data.value
)
diff --git a/src/pydolphinscheduler/core/task.py
b/src/pydolphinscheduler/core/task.py
index 9ec53f0..9f6b644 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -273,13 +273,13 @@ class Task(Base):
"""Get task define attribute `resource_list`."""
resources = set()
for res in self._resource_list:
- if type(res) == str:
+ if isinstance(res, str):
resources.add(
Resource(
name=res, user_name=self.user_name
).get_fullname_from_database()
)
- elif type(res) == dict and ResourceKey.NAME in res:
+ elif isinstance(res, dict) and ResourceKey.NAME in res:
warnings.warn(
"""`resource_list` should be defined using List[str] with
resource paths,
the use of ids to define resources will be remove in
version 3.2.0.
diff --git a/src/pydolphinscheduler/examples/task_datax_example.py
b/src/pydolphinscheduler/examples/task_datax_example.py
index 6fdf779..d463ff0 100644
--- a/src/pydolphinscheduler/examples/task_datax_example.py
+++ b/src/pydolphinscheduler/examples/task_datax_example.py
@@ -90,5 +90,17 @@ with Workflow(
# You can custom json_template of datax to sync data. This task create a
new
# datax job same as task1, transfer record from `first_mysql` to
`second_mysql`
task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE))
+
+ # [start resource_limit]
+ resource_limit = DataX(
+ name="resource_limit",
+ datasource_name="first_mysql",
+ datatarget_name="second_mysql",
+ sql="select id, name, code, description from source_table",
+ target_table="target_table",
+ cpu_quota=1,
+ memory_max=100,
+ )
+ # [end resource_limit]
workflow.run()
# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_python_example.py
b/src/pydolphinscheduler/examples/task_python_example.py
new file mode 100644
index 0000000..553c7a8
--- /dev/null
+++ b/src/pydolphinscheduler/examples/task_python_example.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+"""An example workflow for task python."""
+
+from pydolphinscheduler.core.workflow import Workflow
+from pydolphinscheduler.tasks.python import Python
+
+with Workflow(
+ name="task_python_example",
+) as workflow:
+ task_python = Python(
+ name="task",
+ definition="print('hello world.')",
+ )
+
+ # [start resource_limit]
+ python_resources_limit = Python(
+ name="python_resources_limit",
+ definition="print('hello world.')",
+ cpu_quota=1,
+ memory_max=100,
+ )
+ # [end resource_limit]
+
+ task_python >> python_resources_limit
+ workflow.submit()
+# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_pytorch_example.py
b/src/pydolphinscheduler/examples/task_pytorch_example.py
index 8e431d5..a587f53 100644
--- a/src/pydolphinscheduler/examples/task_pytorch_example.py
+++ b/src/pydolphinscheduler/examples/task_pytorch_example.py
@@ -56,5 +56,17 @@ with Workflow(
requirements="requirements.txt",
)
+ # [start resource_limit]
+ pytorch_resources_limit = Pytorch(
+ name="pytorch_resources_limit",
+ script="main.py",
+ script_params="--dry-run --no-cuda",
+ project_path="https://github.com/pytorch/examples#mnist",
+ python_command="/home/anaconda3/envs/pytorch/bin/python3",
+ cpu_quota=1,
+ memory_max=100,
+ )
+ # [end resource_limit]
+
workflow.submit()
# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/tutorial.py
b/src/pydolphinscheduler/examples/tutorial.py
index 74080b8..9519d8e 100644
--- a/src/pydolphinscheduler/examples/tutorial.py
+++ b/src/pydolphinscheduler/examples/tutorial.py
@@ -52,13 +52,22 @@ with Workflow(
task_child_one = Shell(name="task_child_one", command="echo 'child one'")
task_child_two = Shell(name="task_child_two", command="echo 'child two'")
task_union = Shell(name="task_union", command="echo union")
+
+ # [start resource_limit]
+ resource_limit = Shell(
+ name="resource_limit",
+ command="echo resource limit",
+ cpu_quota=1,
+ memory_max=100,
+ )
+ # [end resource_limit]
# [end task_declare]
# [start task_relation_declare]
task_group = [task_child_one, task_child_two]
task_parent.set_downstream(task_group)
- task_union << task_group
+ resource_limit << task_union << task_group
# [end task_relation_declare]
# [start submit_or_run]
diff --git a/src/pydolphinscheduler/models/base.py
b/src/pydolphinscheduler/models/base.py
index 2647714..007edec 100644
--- a/src/pydolphinscheduler/models/base.py
+++ b/src/pydolphinscheduler/models/base.py
@@ -43,7 +43,7 @@ class Base:
return f'<{type(self).__name__}: name="{self.name}">'
def __eq__(self, other):
- return type(self) == type(other) and all(
+ return type(self) is type(other) and all(
getattr(self, a, None) == getattr(other, a, None) for a in
self._KEY_ATTR
)
diff --git a/src/pydolphinscheduler/tasks/datax.py
b/src/pydolphinscheduler/tasks/datax.py
index 1dfa89c..47b8b1f 100644
--- a/src/pydolphinscheduler/tasks/datax.py
+++ b/src/pydolphinscheduler/tasks/datax.py
@@ -20,11 +20,12 @@
from typing import Dict, List, Optional
from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.mixin import WorkerResourceMixin
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.models.datasource import Datasource
-class CustomDataX(Task):
+class CustomDataX(WorkerResourceMixin, Task):
"""Task CustomDatax object, declare behavior for custom DataX task to
dolphinscheduler.
You provider json template for DataX, it can synchronize data according to
the template you provided.
@@ -51,9 +52,10 @@ class CustomDataX(Task):
self.custom_config = self.CUSTOM_CONFIG
self.xms = xms
self.xmx = xmx
+ self.add_attr(**kwargs)
-class DataX(Task):
+class DataX(WorkerResourceMixin, Task):
"""Task DataX object, declare behavior for DataX task to dolphinscheduler.
It should run database datax job in multiply sql link engine, such as:
@@ -123,6 +125,7 @@ class DataX(Task):
self.post_statements = post_statements or []
self.xms = xms
self.xmx = xmx
+ self.add_attr(**kwargs)
@property
def source_params(self) -> Dict:
diff --git a/src/pydolphinscheduler/tasks/python.py
b/src/pydolphinscheduler/tasks/python.py
index c1b2558..93cec00 100644
--- a/src/pydolphinscheduler/tasks/python.py
+++ b/src/pydolphinscheduler/tasks/python.py
@@ -26,13 +26,14 @@ from typing import Union
from stmdency.extractor import Extractor
from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.mixin import WorkerResourceMixin
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.exceptions import PyDSParamException
log = logging.getLogger(__file__)
-class Python(Task):
+class Python(WorkerResourceMixin, Task):
"""Task Python object, declare behavior for Python task to
dolphinscheduler.
Python task support two types of parameters for :param:``definition``, and
here is an example:
@@ -67,6 +68,7 @@ class Python(Task):
):
self._definition = definition
super().__init__(name, TaskType.PYTHON, *args, **kwargs)
+ self.add_attr(**kwargs)
def _build_exe_str(self) -> str:
"""Build executable string from given definition.
diff --git a/src/pydolphinscheduler/tasks/pytorch.py
b/src/pydolphinscheduler/tasks/pytorch.py
index 4767f7e..8ae371f 100644
--- a/src/pydolphinscheduler/tasks/pytorch.py
+++ b/src/pydolphinscheduler/tasks/pytorch.py
@@ -19,6 +19,7 @@
from typing import Optional
from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.mixin import WorkerResourceMixin
from pydolphinscheduler.core.task import Task
@@ -30,7 +31,7 @@ class DEFAULT:
python_command = "${PYTHON_HOME}"
-class Pytorch(Task):
+class Pytorch(WorkerResourceMixin, Task):
"""Task Pytorch object, declare behavior for Pytorch task to
dolphinscheduler.
See also: `DolphinScheduler Pytorch Task Plugin
@@ -83,6 +84,7 @@ class Pytorch(Task):
self.python_env_tool = python_env_tool
self.requirements = requirements
self.conda_python_version = conda_python_version
+ self.add_attr(**kwargs)
@property
def other_params(self):
diff --git a/src/pydolphinscheduler/tasks/shell.py
b/src/pydolphinscheduler/tasks/shell.py
index 36ec4e8..f6795ca 100644
--- a/src/pydolphinscheduler/tasks/shell.py
+++ b/src/pydolphinscheduler/tasks/shell.py
@@ -18,10 +18,11 @@
"""Task shell."""
from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.mixin import WorkerResourceMixin
from pydolphinscheduler.core.task import Task
-class Shell(Task):
+class Shell(WorkerResourceMixin, Task):
"""Task shell object, declare behavior for shell task to dolphinscheduler.
:param name: A unique, meaningful string for the shell task.
@@ -56,3 +57,4 @@ class Shell(Task):
def __init__(self, name: str, command: str, *args, **kwargs):
self._raw_script = command
super().__init__(name, TaskType.SHELL, *args, **kwargs)
+ self.add_attr(**kwargs)
diff --git a/tests/tasks/test_datax.py b/tests/tasks/test_datax.py
index b174afa..aa884a6 100644
--- a/tests/tasks/test_datax.py
+++ b/tests/tasks/test_datax.py
@@ -173,3 +173,78 @@ def test_resources_local_custom_datax_command_content(
"""Test task CustomDataX json content through the local resource
plug-in."""
custom_datax = CustomDataX(**attr)
assert expect == getattr(custom_datax, "json")
+
+
[email protected](
+ "resource_limit",
+ [
+ {"cpu_quota": 1, "memory_max": 10},
+ {"memory_max": 15},
+ {},
+ ],
+)
[email protected](Datasource, "get_task_usage_4j", return_value=TaskUsage(1,
"MYSQL"))
+def test_datax_get_define_cpu_and_memory(mock_datasource, resource_limit):
+ """Test task datax function get_define with resource limit."""
+ code = 123
+ version = 1
+ name = "test_datax_get_define_cpu_and_memory"
+ command = "select name from test_source_table_name_resource_limit"
+ datasource_name = "test_datasource_resource_limit"
+ datatarget_name = "test_datatarget_resource_limit"
+ target_table = "test_target_table_name_resource_limit"
+
+ with patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(code, version),
+ ):
+ datax = DataX(
+ name,
+ datasource_name,
+ datatarget_name,
+ command,
+ target_table,
+ **resource_limit
+ )
+ assert "cpuQuota" in datax.get_define()
+ assert "memoryMax" in datax.get_define()
+
+ if "cpuQuota" in resource_limit:
+ assert datax.get_define()["cpuQuota"] ==
resource_limit.get("cpu_quota")
+
+ if "memoryMax" in resource_limit:
+ assert datax.get_define()["memoryMax"] ==
resource_limit.get("memory_max")
+
+
[email protected](
+ "resource_limit",
+ [
+ {"cpu_quota": 1, "memory_max": 10},
+ {"memory_max": 15},
+ {},
+ ],
+)
+def test_custom_datax_get_define_cpu_and_memory(resource_limit):
+ """Test custom datax shell function get_define with resource limit."""
+ code = 123
+ version = 1
+
+ with patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(code, version),
+ ):
+ custom_datax = CustomDataX(
+ "test_custom_datax_get_define", "json_template", **resource_limit
+ )
+ assert "cpuQuota" in custom_datax.get_define()
+ assert "memoryMax" in custom_datax.get_define()
+
+ if "cpuQuota" in resource_limit:
+ assert custom_datax.get_define()["cpuQuota"] == resource_limit.get(
+ "cpu_quota"
+ )
+
+ if "memoryMax" in resource_limit:
+ assert custom_datax.get_define()["memoryMax"] ==
resource_limit.get(
+ "memory_max"
+ )
diff --git a/tests/tasks/test_python.py b/tests/tasks/test_python.py
index 516513a..e842f90 100644
--- a/tests/tasks/test_python.py
+++ b/tests/tasks/test_python.py
@@ -178,3 +178,33 @@ def test_resources_local_python_command_content(
"""Test task Python definition content through the local resource
plug-in."""
python = Python(**attr)
assert expect == getattr(python, "definition")
+
+
[email protected](
+ "resource_limit",
+ [
+ {"cpu_quota": 1, "memory_max": 10},
+ {"memory_max": 15},
+ {},
+ ],
+)
+def test_python_get_define_cpu_and_memory(resource_limit):
+ """Test task python function get_define with resource limit."""
+ code = 123
+ version = 1
+
+ with patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(code, version),
+ ):
+ python = Python(
+ name="task", definition="print('hello world.')", **resource_limit
+ )
+ assert "cpuQuota" in python.get_define()
+ assert "memoryMax" in python.get_define()
+
+ if "cpuQuota" in resource_limit:
+ assert python.get_define()["cpuQuota"] ==
resource_limit.get("cpu_quota")
+
+ if "memoryMax" in resource_limit:
+ assert python.get_define()["memoryMax"] ==
resource_limit.get("memory_max")
diff --git a/tests/tasks/test_pytorch.py b/tests/tasks/test_pytorch.py
index c04723e..66c1664 100644
--- a/tests/tasks/test_pytorch.py
+++ b/tests/tasks/test_pytorch.py
@@ -122,3 +122,31 @@ def test_other_params(is_create_environment, project_path,
python_command, expec
python_command=python_command,
)
assert task.other_params == expect
+
+
[email protected](
+ "resource_limit",
+ [
+ {"cpu_quota": 1, "memory_max": 10},
+ {"memory_max": 15},
+ {},
+ ],
+)
+def test_pytorch_get_define_cpu_and_memory(resource_limit):
+ """Test task pytorch function get_define with resource limit."""
+ code = 123
+ version = 1
+
+ with patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(code, version),
+ ):
+ pytorch = Pytorch(name="test", script="", script_params="",
**resource_limit)
+ assert "cpuQuota" in pytorch.get_define()
+ assert "memoryMax" in pytorch.get_define()
+
+ if "cpuQuota" in resource_limit:
+ assert pytorch.get_define()["cpuQuota"] ==
resource_limit.get("cpu_quota")
+
+ if "memoryMax" in resource_limit:
+ assert pytorch.get_define()["memoryMax"] ==
resource_limit.get("memory_max")
diff --git a/tests/tasks/test_shell.py b/tests/tasks/test_shell.py
index 3932c08..e768a2d 100644
--- a/tests/tasks/test_shell.py
+++ b/tests/tasks/test_shell.py
@@ -112,3 +112,33 @@ def test_resources_local_shell_command_content(
"""Test task shell task command content through the local resource
plug-in."""
task = Shell(**attr)
assert expect == getattr(task, "raw_script")
+
+
[email protected](
+ "resource_limit",
+ [
+ {"cpu_quota": 1, "memory_max": 10},
+ {"memory_max": 15},
+ {},
+ ],
+)
+def test_shell_get_define_cpu_and_memory(resource_limit):
+ """Test task shell function get_define with resource limit."""
+ code = 123
+ version = 1
+ name = "test_shell_get_define_cpu_and_memory"
+ command = "echo test shell with resource limit"
+
+ with patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(code, version),
+ ):
+ shell = Shell(name, command, **resource_limit)
+ assert "cpuQuota" in shell.get_define()
+ assert "memoryMax" in shell.get_define()
+
+ if "cpuQuota" in resource_limit:
+ assert shell.get_define()["cpuQuota"] ==
resource_limit.get("cpu_quota")
+
+ if "memoryMax" in resource_limit:
+ assert shell.get_define()["memoryMax"] ==
resource_limit.get("memory_max")
diff --git a/tests/testing/constants.py b/tests/testing/constants.py
index 998f711..07a3249 100644
--- a/tests/testing/constants.py
+++ b/tests/testing/constants.py
@@ -24,7 +24,6 @@ import os
task_without_example = {
"http",
"sub_workflow",
- "python",
"procedure",
}