This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 48586a3  Add WorkerResourceMixin better way to set CPU quotas and max 
memory (#110)
48586a3 is described below

commit 48586a3702df3f708044c8c03eedd442f21cf24f
Author: sofyc <[email protected]>
AuthorDate: Thu Oct 26 01:36:32 2023 -0700

    Add WorkerResourceMixin better way to set CPU quotas and max memory (#110)
---
 docs/source/concept.rst                            | 36 +++++------
 docs/source/start.rst                              | 62 +++++++++---------
 docs/source/tasks/datax.rst                        | 10 +++
 docs/source/tasks/python.rst                       | 21 ++++++
 docs/source/tasks/pytorch.rst                      | 11 ++++
 docs/source/tasks/shell.rst                        |  9 +++
 docs/source/tutorial.rst                           | 22 +++----
 setup.cfg                                          |  4 +-
 src/pydolphinscheduler/core/mixin.py               | 39 +++++++++++
 src/pydolphinscheduler/core/parameter.py           |  2 +-
 src/pydolphinscheduler/core/task.py                |  4 +-
 .../examples/task_datax_example.py                 | 12 ++++
 .../examples/task_python_example.py                | 43 +++++++++++++
 .../examples/task_pytorch_example.py               | 12 ++++
 src/pydolphinscheduler/examples/tutorial.py        | 11 +++-
 src/pydolphinscheduler/models/base.py              |  2 +-
 src/pydolphinscheduler/tasks/datax.py              |  7 +-
 src/pydolphinscheduler/tasks/python.py             |  4 +-
 src/pydolphinscheduler/tasks/pytorch.py            |  4 +-
 src/pydolphinscheduler/tasks/shell.py              |  4 +-
 tests/tasks/test_datax.py                          | 75 ++++++++++++++++++++++
 tests/tasks/test_python.py                         | 30 +++++++++
 tests/tasks/test_pytorch.py                        | 28 ++++++++
 tests/tasks/test_shell.py                          | 30 +++++++++
 tests/testing/constants.py                         |  1 -
 25 files changed, 411 insertions(+), 72 deletions(-)

diff --git a/docs/source/concept.rst b/docs/source/concept.rst
index ab8df5e..4164d78 100644
--- a/docs/source/concept.rst
+++ b/docs/source/concept.rst
@@ -23,10 +23,10 @@ In this section, you would know the core concepts of 
*PyDolphinScheduler*.
 Workflow
 --------
 
-Workflow describe the whole things except `tasks`_ and `tasks dependence`_, 
which including
+Workflow describes the whole things except `tasks`_ and `tasks dependence`_, 
which includes
 name, schedule interval, schedule start time and end time. You would know 
scheduler 
 
-Workflow could be initialized in normal assign statement or in context manger.
+Workflow could be initialized in a normal assignment statement or within a 
context manger.
 
 .. code-block:: python
 
@@ -37,10 +37,10 @@ Workflow could be initialized in normal assign statement or 
in context manger.
    with Workflow(name="my first workflow") as workflow:
        workflow.submit()
 
-Workflow is the main object communicate between *PyDolphinScheduler* and 
DolphinScheduler daemon.
-After workflow and task is be declared, you could use `submit` and `run` 
notify server your definition.
+Workflow is the main object communicating between *PyDolphinScheduler* and 
DolphinScheduler daemon.
+After workflow and task is declared, you could use `submit` and `run` to 
notify server your definition.
 
-If you just want to submit your definition and create workflow, without run 
it, you should use attribute `submit`.
+If you just want to submit your definition and create workflow, without 
running it, you should use attribute `submit`.
 But if you want to run the workflow after you submit it, you could use 
attribute `run`.
 
 .. code-block:: python
@@ -54,8 +54,8 @@ But if you want to run the workflow after you submit it, you 
could use attribute
 Schedule
 ~~~~~~~~
 
-We use parameter `schedule` determine the schedule interval of workflow, 
*PyDolphinScheduler* support seven
-asterisks expression, and each of the meaning of position as below
+We use parameter `schedule` to determine the schedule interval of workflow, 
*PyDolphinScheduler* supports seven
+asterisks expression, and each of the meaning of position is as below
 
 .. code-block:: text
 
@@ -136,8 +136,8 @@ Alert is the way to notify user when workflow instance is 
success or failed. We
 Tasks
 -----
 
-Task is the minimum unit running actual job, and it is nodes of DAG, aka 
directed acyclic graph. You could define
-what you want to in the task. It have some required parameter to make 
uniqueness and definition.
+Task is the minimum unit running actual job, and it is a node of DAG, aka 
directed acyclic graph. You could define
+what you want in the task. It has some required parameters to make uniqueness 
and definition.
 
 Here we use :py:meth:`pydolphinscheduler.tasks.Shell` as example, parameter 
`name` and `command` is required and must be provider. Parameter
 `name` set name to the task, and parameter `command` declare the command you 
wish to run in this task.
@@ -147,15 +147,15 @@ Here we use :py:meth:`pydolphinscheduler.tasks.Shell` as 
example, parameter `nam
    # We named this task as "shell", and just run command `echo shell task`
    shell_task = Shell(name="shell", command="echo shell task")
 
-If you want to see all type of tasks, you could see :doc:`tasks/index`.
+If you want to see all types of tasks, you could see :doc:`tasks/index`.
 
 Tasks Dependence
 ~~~~~~~~~~~~~~~~
 
-You could define many tasks in on single `Workflow`_. If all those task is in 
parallel processing,
-then you could leave them alone without adding any additional information. But 
if there have some tasks should
-not be run unless pre task in workflow have be done, we should set task 
dependence to them. Set tasks dependence
-have two mainly way and both of them is easy. You could use bitwise operator 
`>>` and `<<`, or task attribute 
+You could define many tasks in on single `Workflow`_. If all those tasks are 
in parallel processing,
+then you could leave them alone without adding any additional information. But 
if there are some tasks that should
+not be run unless pre task in workflow has been done, we should set task 
dependence to them. Set task dependence
+have two main ways and both of them are easy. You could use bitwise operator 
`>>` and `<<`, or task attribute 
 `set_downstream` and `set_upstream` to do it.
 
 .. code-block:: python
@@ -178,7 +178,7 @@ have two mainly way and both of them is easy. You could use 
bitwise operator `>>
 Task With Workflow
 ~~~~~~~~~~~~~~~~~~
 
-In most of data orchestration cases, you should assigned attribute `workflow` 
to task instance to
+In most of data orchestration cases, you should assign attribute `workflow` to 
task instance to
 decide workflow of task. You could set `workflow` in both normal assign or in 
context manger mode
 
 .. code-block:: python
@@ -232,13 +232,13 @@ Resource Files
 --------------
 
 During workflow running, we may need some resource files to help us run task 
usually. One of a common situation
-is that we already have some executable files locally, and we need to schedule 
in specific time, or add them
-to existing workflow by adding the new tasks. Of cause, we can upload those 
files to target machine and run them
+is that we already have some executable files locally, and we need to schedule 
a specific time, or add them
+to existing workflow by adding the new tasks. Of course, we can upload those 
files to target machine and run them
 in :doc:`shell task <tasks/shell>` by reference the absolute path of file. But 
if we have more than one machine
 to run task, we have to upload those files to each of them. And it is not 
convenient and not flexible, because
 we may need to change our resource files sometimes.
 
-The more pydolphinscheduler way is to upload those files together with 
`workflow`_, and use them in task to run.
+One more pydolphinscheduler way is to upload those files together with 
`workflow`_, and use them in task to run.
 For example, you have a bash script named ``echo-ten.sh`` locally, and it 
contains some code like this:
 
 .. code-block:: bash
diff --git a/docs/source/start.rst b/docs/source/start.rst
index 434d80e..3c41471 100644
--- a/docs/source/start.rst
+++ b/docs/source/start.rst
@@ -18,7 +18,7 @@
 Getting Started
 ===============
 
-To get started with *PyDolphinScheduler* you must ensure python and pip
+To get started with *PyDolphinScheduler* you must ensure python and pip are
 installed on your machine, if you're already set up, you can skip straight
 to `Installing PyDolphinScheduler`_, otherwise please continue with
 `Installing Python`_.
@@ -28,16 +28,16 @@ Installing Python
 
 How to install `python` and `pip` depends on what operating system
 you're using. The python wiki provides up to date
-`instructions for all platforms here`_. When you entering the website
-and choice your operating system, you would be offered the choice and
-select python version. *PyDolphinScheduler* recommend use version above
-Python 3.6 and we highly recommend you install *Stable Releases* instead
+`instructions for all platforms here`_. When you enter the website
+and choose your operating system, you would be offered the choice and
+select python version. *PyDolphinScheduler* recommends using a version above
+Python 3.6 and we highly recommend installing *Stable Releases* instead
 of *Pre-releases*.
 
 After you have download and installed Python, you should open your terminal,
-typing and running :code:`python --version` to check whether the installation
-is correct or not. If all thing good, you could see the version in console
-without error(here is a example after Python 3.8.7 installed)
+type and run :code:`python --version` to check whether the installation
+is correct or not. If everything is good, you could see the version in console
+without error(here is an example after Python 3.8.7 is installed)
 
 .. code-block:: bash
 
@@ -49,21 +49,21 @@ Installing PyDolphinScheduler
 -----------------------------
 
 After Python is already installed on your machine following section
-`installing Python`_, it easy to *PyDolphinScheduler* by pip.
+`installing Python`_, it is easy to install *PyDolphinScheduler* using pip.
 
 .. code-block:: bash
 
    python -m pip install apache-dolphinscheduler
 
-The latest version of *PyDolphinScheduler* would be installed after you run 
above
+The latest version of *PyDolphinScheduler* would be installed after you run 
the above
 command in your terminal. You could go and `start Python Gateway Service`_ to 
finish
-the prepare, and then go to :doc:`tutorial` to make your hand dirty. But if you
+the preparation, and then go to :doc:`tutorial` to get your hand dirty. But if 
you
 want to install the unreleased version of *PyDolphinScheduler*, you could go 
and see
-section `installing PyDolphinScheduler in dev branch`_ for more detail.
+section `installing PyDolphinScheduler in dev branch`_ for more details.
 
 .. note::
 
-   Currently, we released multiple pre-release package in PyPI, you can see 
all released package
+   Currently, we have released multiple pre-release packages in PyPI, you can 
see all released packages
    including pre-release in `release history 
<https://pypi.org/project/apache-dolphinscheduler/#history>`_.
    You can fix the the package version if you want to install pre-release 
package, for example if
    you want to install version `3.0.0-beta-2` package, you can run command
@@ -72,9 +72,9 @@ section `installing PyDolphinScheduler in dev branch`_ for 
more detail.
 Installing PyDolphinScheduler In DEV Branch
 -------------------------------------------
 
-Because the project is developing and some of the features still not release.
-If you want to try some thing unreleased you could install from the source code
-which we hold in GitHub
+Because the project is developing and some of the features are still not 
released.
+If you want to try something unreleased you could install from the source code
+which we hold on GitHub
 
 .. code-block:: bash
 
@@ -84,10 +84,10 @@ which we hold in GitHub
    python -m pip install -e .
 
 After you installed *PyDolphinScheduler*, please remember `start Python 
Gateway Service`_
-which waiting for *PyDolphinScheduler*'s workflow definition require.
+which is required for *PyDolphinScheduler*'s workflow definition.
 
-Above command will clone whole dolphinscheduler source code to local, maybe 
you want to install latest pydolphinscheduler
-package directly and do not care about other code(including Python gateway 
service code), you can execute command
+Above command will clone whole dolphinscheduler source code to local, maybe 
you want to install the latest pydolphinscheduler
+package directly and do not care about other code(including Python gateway 
service code), you can execute the command
 
 .. code-block:: bash
 
@@ -98,10 +98,10 @@ Start Python Gateway Service
 ----------------------------
 
 Since **PyDolphinScheduler** is Python API for `Apache DolphinScheduler`_, it
-could define workflow and tasks structure, but could not run it unless you
-`install Apache DolphinScheduler`_ and start its API server which including
-Python gateway service in it. We only and some key steps here and you could
-go `install Apache DolphinScheduler`_ for more detail
+could define workflow and task structures, but could not run it unless you
+`install Apache DolphinScheduler`_ and start its API server which includes
+Python gateway service in it. We only write some key steps here and you could
+go `install Apache DolphinScheduler`_ for more details
 
 .. code-block:: bash
 
@@ -109,7 +109,7 @@ go `install Apache DolphinScheduler`_ for more detail
    ./bin/dolphinscheduler-daemon.sh start api-server
 
 To check whether the server is alive or not, you could run :code:`jps`. And
-the server is health if keyword `ApiApplicationServer` in the console.
+the server is healthy if keyword `ApiApplicationServer` is in the console.
 
 .. code-block:: bash
 
@@ -120,14 +120,14 @@ the server is health if keyword `ApiApplicationServer` in 
the console.
 
 .. note::
 
-   Please make sure you already enabled started Python gateway service along 
with `api-server`. The configuration is in
+   Please make sure you already started Python gateway service along with 
`api-server`. The configuration is in
    yaml config path `python-gateway.enabled : true` in api-server's 
configuration path in `api-server/conf/application.yaml`.
-   The default value is true and Python gateway service start when api server 
is been started.
+   The default value is true and Python gateway service starts when api server 
is started.
 
 Run an Example
 --------------
 
-Before run an example for pydolphinscheduler, you should get the example code 
from it source code. You could run
+Before run an example for pydolphinscheduler, you should get the example code 
from its source code. You could run
 single bash command to get it
 
 .. code-block:: bash
@@ -156,19 +156,19 @@ from the API server, you should first change 
pydolphinscheduler configuration an
 
 After that, you could go and see your DolphinScheduler web UI to find out a 
new workflow created by pydolphinscheduler,
 and the path of web UI is `Project -> Workflow -> Workflow Definition`, and 
you can see a workflow and workflow instance
-had been created and DAG is auto formatter by web UI.
+had been created and DAG is automatically formatted by web UI.
 
 .. note::
 
-   We have default authentication token when in first launch dolphinscheduler 
and pydolphinscheduler. Please change
+   We have default authentication token when you first launch dolphinscheduler 
and pydolphinscheduler. Please change
    the parameter ``auth_token`` when you deploy in production environment or 
test dolphinscheduler in public network.
-   See :ref:`authentication token <concept:authentication token>` for more 
detail.
+   See :ref:`authentication token <concept:authentication token>` for more 
details.
 
 
 What's More
 -----------
 
-If you do not familiar with *PyDolphinScheduler*, you could go to 
:doc:`tutorial` and see how it works. But
+If you are not familiar with *PyDolphinScheduler*, you could go to 
:doc:`tutorial` and see how it works. But
 if you already know the basic usage or concept of *PyDolphinScheduler*, you 
could go and play with all
 :doc:`tasks/index` *PyDolphinScheduler* supports, or see our 
:doc:`howto/index` about useful cases.
 
diff --git a/docs/source/tasks/datax.rst b/docs/source/tasks/datax.rst
index cb67a2f..f704e3f 100644
--- a/docs/source/tasks/datax.rst
+++ b/docs/source/tasks/datax.rst
@@ -27,6 +27,16 @@ Example
    :start-after: [start workflow_declare]
    :end-before: [end workflow_declare]
 
+Resource Limit Example
+----------------------
+
+We can add resource limit like CPU quota and max memory by passing parameters 
when declaring tasks.
+
+.. literalinclude:: 
../../../src/pydolphinscheduler/examples/task_datax_example.py
+   :start-after: [start resource_limit]
+   :end-before: [end resource_limit]
+
+
 Dive Into
 ---------
 
diff --git a/docs/source/tasks/python.rst b/docs/source/tasks/python.rst
index 1bf6210..40b0c6d 100644
--- a/docs/source/tasks/python.rst
+++ b/docs/source/tasks/python.rst
@@ -18,6 +18,27 @@
 Python
 ======
 
+A Python task type's example and dive into information of 
**PyDolphinScheduler**.
+
+Example
+-------
+
+.. literalinclude:: 
../../../src/pydolphinscheduler/examples/task_python_example.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
+
+Resource Limit Example
+----------------------
+
+We can add resource limit like CPU quota and max memory by passing parameters 
when declaring tasks.
+
+.. literalinclude:: 
../../../src/pydolphinscheduler/examples/task_python_example.py
+   :start-after: [start resource_limit]
+   :end-before: [end resource_limit]
+
+Dive Into
+---------
+
 .. automodule:: pydolphinscheduler.tasks.python
 
 
diff --git a/docs/source/tasks/pytorch.rst b/docs/source/tasks/pytorch.rst
index 4c7a552..7e3c034 100644
--- a/docs/source/tasks/pytorch.rst
+++ b/docs/source/tasks/pytorch.rst
@@ -28,6 +28,17 @@ Example
    :start-after: [start workflow_declare]
    :end-before: [end workflow_declare]
 
+
+Resource Limit Example
+----------------------
+
+We can add resource limit like CPU quota and max memory by passing parameters 
when declaring tasks.
+
+.. literalinclude:: 
../../../src/pydolphinscheduler/examples/task_pytorch_example.py
+   :start-after: [start resource_limit]
+   :end-before: [end resource_limit]
+
+
 Dive Into
 ---------
 
diff --git a/docs/source/tasks/shell.rst b/docs/source/tasks/shell.rst
index 2dd106a..13074fc 100644
--- a/docs/source/tasks/shell.rst
+++ b/docs/source/tasks/shell.rst
@@ -27,6 +27,15 @@ Example
    :start-after: [start workflow_declare]
    :end-before: [end task_relation_declare]
 
+Resource Limit Example
+----------------------
+
+We can add resource limit like CPU quota and max memory by passing parameters 
when declaring tasks.
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/tutorial.py
+   :start-after: [start resource_limit]
+   :end-before: [end resource_limit]
+
 Dive Into
 ---------
 
diff --git a/docs/source/tutorial.rst b/docs/source/tutorial.rst
index 695c945..259e3e7 100644
--- a/docs/source/tutorial.rst
+++ b/docs/source/tutorial.rst
@@ -20,21 +20,21 @@ Tutorial
 
 This tutorial shows you the basic concept of *PyDolphinScheduler* and tells all
 things you should know before you submit or run your first workflow. If you
-still have not installed *PyDolphinScheduler* and start DolphinScheduler, you
-could go and see :ref:`how to getting start PyDolphinScheduler <start:getting 
started>` firstly.
+still have not installed *PyDolphinScheduler* and started DolphinScheduler, you
+could go and see :ref:`how to get started with PyDolphinScheduler 
<start:getting started>` firstly.
 
 Overview of Tutorial
 --------------------
 
-Here have an overview of our tutorial, and it looks a little complex but does 
not
-worry about that because we explain this example below as detail as possible.
+Here, we have an overview of our tutorial, and it looks a little complex but 
don't
+worry about that because we will explain this example below in as much detail 
as possible.
 
 There are two types of tutorials: traditional and task decorator.
 
 - **Traditional Way**: More general, support many :doc:`built-in task types 
<tasks/index>`, it is convenient
   when you build your workflow at the beginning.
-- **Task Decorator**: A Python decorator allow you to wrap your function into 
pydolphinscheduler's task. Less
-  versatility to the traditional way because it only supported Python 
functions and without build-in tasks
+- **Task Decorator**: A Python decorator that allows you to wrap your function 
into pydolphinscheduler's task. Less
+  versatility to the traditional way because it only supports Python functions 
without build-in tasks
   supported. But it is helpful if your workflow is all built with Python or if 
you already have some Python
   workflow code and want to migrate them to pydolphinscheduler.
 - **YAML File**: We can use pydolphinscheduler CLI to create workflow using 
YAML file: :code:`pydolphinscheduler yaml -f tutorial.yaml`. 
@@ -94,7 +94,7 @@ We should instantiate 
:class:`pydolphinscheduler.core.workflow.Workflow` object
 import them from `import necessary module`_. Here we declare basic arguments 
for workflow.
 We define the name of :code:`Workflow`, using `Python context manager`_ and it 
**the only required argument**
 for `Workflow`. Besides, we also declare three arguments named 
:code:`schedule` and :code:`start_time`
-which setting workflow schedule interval and schedule start_time, and argument 
:code:`tenant` defines which tenant
+which sets workflow schedule interval and schedule start_time, and argument 
:code:`tenant` defines which tenant
 will be running this task in the DolphinScheduler worker. See :ref:`section 
tenant <concept:tenant>` in
 *PyDolphinScheduler* :doc:`concept` for more information.
 
@@ -119,7 +119,7 @@ will be running this task in the DolphinScheduler worker. 
See :ref:`section tena
       :end-before: # Define the tasks within the workflow
       :language: yaml
 
-We could find more detail about :code:`Workflow` in :ref:`concept about 
workflow <concept:workflow>`
+We could find more details about :code:`Workflow` in :ref:`concept about 
workflow <concept:workflow>`
 if you are interested in it. For all arguments of object workflow, you could 
find in the
 :class:`pydolphinscheduler.core.workflow` API documentation.
 
@@ -128,7 +128,7 @@ Task Declaration
 
 .. tab:: Tradition
 
-   We declare four tasks to show how to create tasks, and both of them are 
simple tasks of
+   We declare four tasks to show how to create tasks, and all of them are 
simple tasks of
    :class:`pydolphinscheduler.tasks.shell` which runs `echo` command in the 
terminal. Besides the argument
    `command` with :code:`echo` command, we also need to set the argument 
`name` for each task
    *(not only shell task, `name` is required for each type of task)*.
@@ -142,7 +142,7 @@ Task Declaration
 
 .. tab:: Task Decorator
 
-   We declare four tasks to show how to create tasks, and both of them are 
created by the task decorator which
+   We declare four tasks to show how to create tasks, and all of them are 
created by the task decorator which
    using :func:`pydolphinscheduler.tasks.func_wrap.task`. All we have to do is 
add a decorator named
    :code:`@task` to existing Python function, and then use them inside 
:class:`pydolphinscheduler.core.workflow`
 
@@ -151,7 +151,7 @@ Task Declaration
       :start-after: [start task_declare]
       :end-before: [end task_declare]
 
-   It makes our workflow more Pythonic, but be careful that when we use task 
decorator mode mean we only use
+   It makes our workflow more Pythonic, but be careful that when we use task 
decorator mode, it means we only use
    Python function as a task and could not use the :doc:`built-in tasks 
<tasks/index>` most of the cases.
 
 .. tab:: YAML File
diff --git a/setup.cfg b/setup.cfg
index f8f28d7..cca6044 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -182,7 +182,9 @@ ignore =
     D105,
     # Conflict to Black
     # W503: Line breaks before binary operators
-    W503
+    W503,
+    # D400: First line should end with a period
+    D400
 per-file-ignores =
     */pydolphinscheduler/side/__init__.py:F401
     */pydolphinscheduler/tasks/__init__.py:F401
diff --git a/src/pydolphinscheduler/core/mixin.py 
b/src/pydolphinscheduler/core/mixin.py
new file mode 100644
index 0000000..1cf35b3
--- /dev/null
+++ b/src/pydolphinscheduler/core/mixin.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""WorkerResource Mixin"""
+
+
+class WorkerResourceMixin:
+    """Mixin object, declare some attributes for WorkerResource."""
+
+    def add_attr(self, **kwargs):
+        """Add attributes to WorkerResource, include cpu_quota and memory_max 
now."""
+        self._cpu_quota = kwargs.get("cpu_quota", -1)
+        self._memory_max = kwargs.get("memory_max", -1)
+        if hasattr(self, "_DEFINE_ATTR"):
+            self._DEFINE_ATTR |= {"cpu_quota", "memory_max"}
+
+    @property
+    def cpu_quota(self):
+        """Get cpu_quota."""
+        return self._cpu_quota
+
+    @property
+    def memory_max(self):
+        """Get memory_max."""
+        return self._memory_max
diff --git a/src/pydolphinscheduler/core/parameter.py 
b/src/pydolphinscheduler/core/parameter.py
index 6bac357..15ea723 100644
--- a/src/pydolphinscheduler/core/parameter.py
+++ b/src/pydolphinscheduler/core/parameter.py
@@ -49,7 +49,7 @@ class BaseDataType:
 
     def __eq__(self, data):
         return (
-            type(self) == type(data)
+            type(self) is type(data)
             and self.data_type == data.data_type
             and self.value == data.value
         )
diff --git a/src/pydolphinscheduler/core/task.py 
b/src/pydolphinscheduler/core/task.py
index 9ec53f0..9f6b644 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -273,13 +273,13 @@ class Task(Base):
         """Get task define attribute `resource_list`."""
         resources = set()
         for res in self._resource_list:
-            if type(res) == str:
+            if isinstance(res, str):
                 resources.add(
                     Resource(
                         name=res, user_name=self.user_name
                     ).get_fullname_from_database()
                 )
-            elif type(res) == dict and ResourceKey.NAME in res:
+            elif isinstance(res, dict) and ResourceKey.NAME in res:
                 warnings.warn(
                     """`resource_list` should be defined using List[str] with 
resource paths,
                        the use of ids to define resources will be remove in 
version 3.2.0.
diff --git a/src/pydolphinscheduler/examples/task_datax_example.py 
b/src/pydolphinscheduler/examples/task_datax_example.py
index 6fdf779..d463ff0 100644
--- a/src/pydolphinscheduler/examples/task_datax_example.py
+++ b/src/pydolphinscheduler/examples/task_datax_example.py
@@ -90,5 +90,17 @@ with Workflow(
     # You can custom json_template of datax to sync data. This task create a 
new
     # datax job same as task1, transfer record from `first_mysql` to 
`second_mysql`
     task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE))
+
+    # [start resource_limit]
+    resource_limit = DataX(
+        name="resource_limit",
+        datasource_name="first_mysql",
+        datatarget_name="second_mysql",
+        sql="select id, name, code, description from source_table",
+        target_table="target_table",
+        cpu_quota=1,
+        memory_max=100,
+    )
+    # [end resource_limit]
     workflow.run()
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_python_example.py 
b/src/pydolphinscheduler/examples/task_python_example.py
new file mode 100644
index 0000000..553c7a8
--- /dev/null
+++ b/src/pydolphinscheduler/examples/task_python_example.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+"""An example workflow for task python."""
+
+from pydolphinscheduler.core.workflow import Workflow
+from pydolphinscheduler.tasks.python import Python
+
+with Workflow(
+    name="task_python_example",
+) as workflow:
+    task_python = Python(
+        name="task",
+        definition="print('hello world.')",
+    )
+
+    # [start resource_limit]
+    python_resources_limit = Python(
+        name="python_resources_limit",
+        definition="print('hello world.')",
+        cpu_quota=1,
+        memory_max=100,
+    )
+    # [end resource_limit]
+
+    task_python >> python_resources_limit
+    workflow.submit()
+# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_pytorch_example.py 
b/src/pydolphinscheduler/examples/task_pytorch_example.py
index 8e431d5..a587f53 100644
--- a/src/pydolphinscheduler/examples/task_pytorch_example.py
+++ b/src/pydolphinscheduler/examples/task_pytorch_example.py
@@ -56,5 +56,17 @@ with Workflow(
         requirements="requirements.txt",
     )
 
+    # [start resource_limit]
+    pytorch_resources_limit = Pytorch(
+        name="pytorch_resources_limit",
+        script="main.py",
+        script_params="--dry-run --no-cuda",
+        project_path="https://github.com/pytorch/examples#mnist";,
+        python_command="/home/anaconda3/envs/pytorch/bin/python3",
+        cpu_quota=1,
+        memory_max=100,
+    )
+    # [end resource_limit]
+
     workflow.submit()
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/tutorial.py 
b/src/pydolphinscheduler/examples/tutorial.py
index 74080b8..9519d8e 100644
--- a/src/pydolphinscheduler/examples/tutorial.py
+++ b/src/pydolphinscheduler/examples/tutorial.py
@@ -52,13 +52,22 @@ with Workflow(
     task_child_one = Shell(name="task_child_one", command="echo 'child one'")
     task_child_two = Shell(name="task_child_two", command="echo 'child two'")
     task_union = Shell(name="task_union", command="echo union")
+
+    # [start resource_limit]
+    resource_limit = Shell(
+        name="resource_limit",
+        command="echo resource limit",
+        cpu_quota=1,
+        memory_max=100,
+    )
+    # [end resource_limit]
     # [end task_declare]
 
     # [start task_relation_declare]
     task_group = [task_child_one, task_child_two]
     task_parent.set_downstream(task_group)
 
-    task_union << task_group
+    resource_limit << task_union << task_group
     # [end task_relation_declare]
 
     # [start submit_or_run]
diff --git a/src/pydolphinscheduler/models/base.py 
b/src/pydolphinscheduler/models/base.py
index 2647714..007edec 100644
--- a/src/pydolphinscheduler/models/base.py
+++ b/src/pydolphinscheduler/models/base.py
@@ -43,7 +43,7 @@ class Base:
         return f'<{type(self).__name__}: name="{self.name}">'
 
     def __eq__(self, other):
-        return type(self) == type(other) and all(
+        return type(self) is type(other) and all(
             getattr(self, a, None) == getattr(other, a, None) for a in 
self._KEY_ATTR
         )
 
diff --git a/src/pydolphinscheduler/tasks/datax.py 
b/src/pydolphinscheduler/tasks/datax.py
index 1dfa89c..47b8b1f 100644
--- a/src/pydolphinscheduler/tasks/datax.py
+++ b/src/pydolphinscheduler/tasks/datax.py
@@ -20,11 +20,12 @@
 from typing import Dict, List, Optional
 
 from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.mixin import WorkerResourceMixin
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.models.datasource import Datasource
 
 
-class CustomDataX(Task):
+class CustomDataX(WorkerResourceMixin, Task):
     """Task CustomDatax object, declare behavior for custom DataX task to 
dolphinscheduler.
 
     You provider json template for DataX, it can synchronize data according to 
the template you provided.
@@ -51,9 +52,10 @@ class CustomDataX(Task):
         self.custom_config = self.CUSTOM_CONFIG
         self.xms = xms
         self.xmx = xmx
+        self.add_attr(**kwargs)
 
 
-class DataX(Task):
+class DataX(WorkerResourceMixin, Task):
     """Task DataX object, declare behavior for DataX task to dolphinscheduler.
 
     It should run database datax job in multiply sql link engine, such as:
@@ -123,6 +125,7 @@ class DataX(Task):
         self.post_statements = post_statements or []
         self.xms = xms
         self.xmx = xmx
+        self.add_attr(**kwargs)
 
     @property
     def source_params(self) -> Dict:
diff --git a/src/pydolphinscheduler/tasks/python.py 
b/src/pydolphinscheduler/tasks/python.py
index c1b2558..93cec00 100644
--- a/src/pydolphinscheduler/tasks/python.py
+++ b/src/pydolphinscheduler/tasks/python.py
@@ -26,13 +26,14 @@ from typing import Union
 from stmdency.extractor import Extractor
 
 from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.mixin import WorkerResourceMixin
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.exceptions import PyDSParamException
 
 log = logging.getLogger(__file__)
 
 
-class Python(Task):
+class Python(WorkerResourceMixin, Task):
     """Task Python object, declare behavior for Python task to 
dolphinscheduler.
 
     Python task support two types of parameters for :param:``definition``, and 
here is an example:
@@ -67,6 +68,7 @@ class Python(Task):
     ):
         self._definition = definition
         super().__init__(name, TaskType.PYTHON, *args, **kwargs)
+        self.add_attr(**kwargs)
 
     def _build_exe_str(self) -> str:
         """Build executable string from given definition.
diff --git a/src/pydolphinscheduler/tasks/pytorch.py 
b/src/pydolphinscheduler/tasks/pytorch.py
index 4767f7e..8ae371f 100644
--- a/src/pydolphinscheduler/tasks/pytorch.py
+++ b/src/pydolphinscheduler/tasks/pytorch.py
@@ -19,6 +19,7 @@
 from typing import Optional
 
 from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.mixin import WorkerResourceMixin
 from pydolphinscheduler.core.task import Task
 
 
@@ -30,7 +31,7 @@ class DEFAULT:
     python_command = "${PYTHON_HOME}"
 
 
-class Pytorch(Task):
+class Pytorch(WorkerResourceMixin, Task):
     """Task Pytorch object, declare behavior for Pytorch task to 
dolphinscheduler.
 
     See also: `DolphinScheduler Pytorch Task Plugin
@@ -83,6 +84,7 @@ class Pytorch(Task):
         self.python_env_tool = python_env_tool
         self.requirements = requirements
         self.conda_python_version = conda_python_version
+        self.add_attr(**kwargs)
 
     @property
     def other_params(self):
diff --git a/src/pydolphinscheduler/tasks/shell.py 
b/src/pydolphinscheduler/tasks/shell.py
index 36ec4e8..f6795ca 100644
--- a/src/pydolphinscheduler/tasks/shell.py
+++ b/src/pydolphinscheduler/tasks/shell.py
@@ -18,10 +18,11 @@
 """Task shell."""
 
 from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.mixin import WorkerResourceMixin
 from pydolphinscheduler.core.task import Task
 
 
-class Shell(Task):
+class Shell(WorkerResourceMixin, Task):
     """Task shell object, declare behavior for shell task to dolphinscheduler.
 
     :param name: A unique, meaningful string for the shell task.
@@ -56,3 +57,4 @@ class Shell(Task):
     def __init__(self, name: str, command: str, *args, **kwargs):
         self._raw_script = command
         super().__init__(name, TaskType.SHELL, *args, **kwargs)
+        self.add_attr(**kwargs)
diff --git a/tests/tasks/test_datax.py b/tests/tasks/test_datax.py
index b174afa..aa884a6 100644
--- a/tests/tasks/test_datax.py
+++ b/tests/tasks/test_datax.py
@@ -173,3 +173,78 @@ def test_resources_local_custom_datax_command_content(
     """Test task CustomDataX json content through the local resource 
plug-in."""
     custom_datax = CustomDataX(**attr)
     assert expect == getattr(custom_datax, "json")
+
+
[email protected](
+    "resource_limit",
+    [
+        {"cpu_quota": 1, "memory_max": 10},
+        {"memory_max": 15},
+        {},
+    ],
+)
[email protected](Datasource, "get_task_usage_4j", return_value=TaskUsage(1, 
"MYSQL"))
+def test_datax_get_define_cpu_and_memory(mock_datasource, resource_limit):
+    """Test task datax function get_define with resource limit."""
+    code = 123
+    version = 1
+    name = "test_datax_get_define_cpu_and_memory"
+    command = "select name from test_source_table_name_resource_limit"
+    datasource_name = "test_datasource_resource_limit"
+    datatarget_name = "test_datatarget_resource_limit"
+    target_table = "test_target_table_name_resource_limit"
+
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        datax = DataX(
+            name,
+            datasource_name,
+            datatarget_name,
+            command,
+            target_table,
+            **resource_limit
+        )
+        assert "cpuQuota" in datax.get_define()
+        assert "memoryMax" in datax.get_define()
+
+        if "cpuQuota" in resource_limit:
+            assert datax.get_define()["cpuQuota"] == 
resource_limit.get("cpu_quota")
+
+        if "memoryMax" in resource_limit:
+            assert datax.get_define()["memoryMax"] == 
resource_limit.get("memory_max")
+
+
[email protected](
+    "resource_limit",
+    [
+        {"cpu_quota": 1, "memory_max": 10},
+        {"memory_max": 15},
+        {},
+    ],
+)
+def test_custom_datax_get_define_cpu_and_memory(resource_limit):
+    """Test custom datax shell function get_define with resource limit."""
+    code = 123
+    version = 1
+
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        custom_datax = CustomDataX(
+            "test_custom_datax_get_define", "json_template", **resource_limit
+        )
+        assert "cpuQuota" in custom_datax.get_define()
+        assert "memoryMax" in custom_datax.get_define()
+
+        if "cpuQuota" in resource_limit:
+            assert custom_datax.get_define()["cpuQuota"] == resource_limit.get(
+                "cpu_quota"
+            )
+
+        if "memoryMax" in resource_limit:
+            assert custom_datax.get_define()["memoryMax"] == 
resource_limit.get(
+                "memory_max"
+            )
diff --git a/tests/tasks/test_python.py b/tests/tasks/test_python.py
index 516513a..e842f90 100644
--- a/tests/tasks/test_python.py
+++ b/tests/tasks/test_python.py
@@ -178,3 +178,33 @@ def test_resources_local_python_command_content(
     """Test task Python definition content through the local resource 
plug-in."""
     python = Python(**attr)
     assert expect == getattr(python, "definition")
+
+
[email protected](
+    "resource_limit",
+    [
+        {"cpu_quota": 1, "memory_max": 10},
+        {"memory_max": 15},
+        {},
+    ],
+)
+def test_python_get_define_cpu_and_memory(resource_limit):
+    """Test task python function get_define with resource limit."""
+    code = 123
+    version = 1
+
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        python = Python(
+            name="task", definition="print('hello world.')", **resource_limit
+        )
+        assert "cpuQuota" in python.get_define()
+        assert "memoryMax" in python.get_define()
+
+        if "cpuQuota" in resource_limit:
+            assert python.get_define()["cpuQuota"] == 
resource_limit.get("cpu_quota")
+
+        if "memoryMax" in resource_limit:
+            assert python.get_define()["memoryMax"] == 
resource_limit.get("memory_max")
diff --git a/tests/tasks/test_pytorch.py b/tests/tasks/test_pytorch.py
index c04723e..66c1664 100644
--- a/tests/tasks/test_pytorch.py
+++ b/tests/tasks/test_pytorch.py
@@ -122,3 +122,31 @@ def test_other_params(is_create_environment, project_path, 
python_command, expec
             python_command=python_command,
         )
         assert task.other_params == expect
+
+
[email protected](
+    "resource_limit",
+    [
+        {"cpu_quota": 1, "memory_max": 10},
+        {"memory_max": 15},
+        {},
+    ],
+)
+def test_pytorch_get_define_cpu_and_memory(resource_limit):
+    """Test task pytorch function get_define with resource limit."""
+    code = 123
+    version = 1
+
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        pytorch = Pytorch(name="test", script="", script_params="", 
**resource_limit)
+        assert "cpuQuota" in pytorch.get_define()
+        assert "memoryMax" in pytorch.get_define()
+
+        if "cpuQuota" in resource_limit:
+            assert pytorch.get_define()["cpuQuota"] == 
resource_limit.get("cpu_quota")
+
+        if "memoryMax" in resource_limit:
+            assert pytorch.get_define()["memoryMax"] == 
resource_limit.get("memory_max")
diff --git a/tests/tasks/test_shell.py b/tests/tasks/test_shell.py
index 3932c08..e768a2d 100644
--- a/tests/tasks/test_shell.py
+++ b/tests/tasks/test_shell.py
@@ -112,3 +112,33 @@ def test_resources_local_shell_command_content(
     """Test task shell task command content through the local resource 
plug-in."""
     task = Shell(**attr)
     assert expect == getattr(task, "raw_script")
+
+
[email protected](
+    "resource_limit",
+    [
+        {"cpu_quota": 1, "memory_max": 10},
+        {"memory_max": 15},
+        {},
+    ],
+)
+def test_shell_get_define_cpu_and_memory(resource_limit):
+    """Test task shell function get_define with resource limit."""
+    code = 123
+    version = 1
+    name = "test_shell_get_define_cpu_and_memory"
+    command = "echo test shell with resource limit"
+
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        shell = Shell(name, command, **resource_limit)
+        assert "cpuQuota" in shell.get_define()
+        assert "memoryMax" in shell.get_define()
+
+        if "cpuQuota" in resource_limit:
+            assert shell.get_define()["cpuQuota"] == 
resource_limit.get("cpu_quota")
+
+        if "memoryMax" in resource_limit:
+            assert shell.get_define()["memoryMax"] == 
resource_limit.get("memory_max")
diff --git a/tests/testing/constants.py b/tests/testing/constants.py
index 998f711..07a3249 100644
--- a/tests/testing/constants.py
+++ b/tests/testing/constants.py
@@ -24,7 +24,6 @@ import os
 task_without_example = {
     "http",
     "sub_workflow",
-    "python",
     "procedure",
 }
 


Reply via email to