This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6d460a3ca2 [DSIP-13][python] New mechanism file plugins to Python API 
(#11360)
6d460a3ca2 is described below

commit 6d460a3ca2a08f524100100f49f8667a1f0093cb
Author: chenrj <[email protected]>
AuthorDate: Wed Sep 7 10:02:37 2022 +0800

    [DSIP-13][python] New mechanism file plugins to Python API (#11360)
---
 .../pydolphinscheduler/docs/source/index.rst       |   1 +
 .../docs/source/resources_plugin/develop.rst       |  46 ++++++++
 .../docs/source/{ => resources_plugin}/index.rst   |  31 ++----
 .../{index.rst => resources_plugin/local.rst}      |  31 ++----
 .../source/resources_plugin/resource-plugin.rst    |  75 +++++++++++++
 .../pydolphinscheduler/core/process_definition.py  |   3 +
 .../{exceptions.py => core/resource_plugin.py}     |  33 +++---
 .../src/pydolphinscheduler/core/task.py            |  47 ++++++++-
 .../examples/tutorial_resource_plugin.py           |  64 ++++++++++++
 .../src/pydolphinscheduler/exceptions.py           |   4 +
 .../__init__.py}                                   |  29 +-----
 .../pydolphinscheduler/resources_plugin/local.py   |  57 ++++++++++
 .../src/pydolphinscheduler/tasks/shell.py          |   5 +-
 .../pydolphinscheduler/tests/core/test_task.py     | 116 ++++++++++++++++++++-
 .../tests/example/test_example.py                  |   6 +-
 .../resources_plugin/__init__.py}                  |  26 +----
 .../tests/resources_plugin/test_local.py           | 108 +++++++++++++++++++
 .../pydolphinscheduler/tests/tasks/test_shell.py   |  43 +++++++-
 18 files changed, 612 insertions(+), 113 deletions(-)

diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst 
b/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
index 24ad107ad6..4dc0a949c9 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
@@ -36,6 +36,7 @@ then go and see :doc:`tutorial` for more detail.
    cli
    config
    api
+   resources_plugin/index
 
 Indices and tables
 ==================
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/develop.rst
 
b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/develop.rst
new file mode 100644
index 0000000000..9e112b240a
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/develop.rst
@@ -0,0 +1,46 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+How to develop
+==============
+
+When you want to create a new resource plugin, you need to add a new class in 
the module `resources_plugin`.
+
+The resource plug-in class needs to inherit the abstract class 
`ResourcePlugin` and implement its abstract method `read_file` function.
+
+The parameter of the `__init__` function of `ResourcePlugin` is the prefix of 
STR type. You can override this function when necessary.
+
+The `read_file` function parameter of `ResourcePlugin` is the file suffix of 
STR type, and its return value is the file content, if it exists and is 
readable.
+
+
+Example
+-------
+- Method `__init__`: Initiation method with `param`:`prefix`
+
+.. literalinclude:: ../../../src/pydolphinscheduler/resources_plugin/local.py
+    :start-after: [start init_method]
+    :end-before: [end init_method]
+
+- Method `read_file`: Get content from the given URI, The function parameter 
is the suffix of the file path.
+
+The file prefix has been initialized in init of the resource plug-in.
+
+The prefix plus suffix is the absolute path of the file in this resource.
+
+.. literalinclude:: ../../../src/pydolphinscheduler/resources_plugin/local.py
+    :start-after: [start read_file_method]
+    :end-before: [end read_file_method]
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst 
b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/index.rst
similarity index 58%
copy from dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
copy to 
dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/index.rst
index 24ad107ad6..05a7ebd94a 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
+++ 
b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/index.rst
@@ -15,31 +15,14 @@
    specific language governing permissions and limitations
    under the License.
 
-PyDolphinScheduler
-==================
-
-**PyDolphinScheduler** is Python API for `Apache DolphinScheduler 
<https://dolphinscheduler.apache.org>`_,
-which allow you definition your workflow by Python code, aka workflow-as-codes.
-
-I could go and find how to :ref:`install <start:getting started>` the project. 
Or if you want to see simply example
-then go and see :doc:`tutorial` for more detail.
+Resources_plugin
+================
 
+In this section
 
 .. toctree::
-   :maxdepth: 2
-
-   start
-   tutorial
-   concept
-   tasks/index
-   howto/index
-   cli
-   config
-   api
-
-Indices and tables
-==================
+   :maxdepth: 1
 
-* :ref:`genindex`
-* :ref:`modindex`
-* :ref:`search`
+   develop
+   resource-plugin
+   local
\ No newline at end of file
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst 
b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/local.rst
similarity index 57%
copy from dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
copy to 
dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/local.rst
index 24ad107ad6..5da025a5c7 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
+++ 
b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/local.rst
@@ -15,31 +15,18 @@
    specific language governing permissions and limitations
    under the License.
 
-PyDolphinScheduler
-==================
+Local
+=====
 
-**PyDolphinScheduler** is Python API for `Apache DolphinScheduler 
<https://dolphinscheduler.apache.org>`_,
-which allow you definition your workflow by Python code, aka workflow-as-codes.
+`Local` is a local resource plugin for pydolphinscheduler.
 
-I could go and find how to :ref:`install <start:getting started>` the project. 
Or if you want to see simply example
-then go and see :doc:`tutorial` for more detail.
+When using a local resource plugin, you only need to add the `resource_plugin` 
parameter in the task subclass or workflow definition,
+such as `resource_plugin=Local("/tmp")`.
 
 
-.. toctree::
-   :maxdepth: 2
+For the specific use of resource plugins, you can see `How to use` in 
:doc:`./resource-plugin`
 
-   start
-   tutorial
-   concept
-   tasks/index
-   howto/index
-   cli
-   config
-   api
+Dive Into
+---------
 
-Indices and tables
-==================
-
-* :ref:`genindex`
-* :ref:`modindex`
-* :ref:`search`
+.. automodule:: pydolphinscheduler.resources_plugin.local
\ No newline at end of file
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/resource-plugin.rst
 
b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/resource-plugin.rst
new file mode 100644
index 0000000000..0b90eeecbf
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/resource-plugin.rst
@@ -0,0 +1,75 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+ResourcePlugin
+==============
+
+`ResourcePlugin` is an abstract class of resource plug-in parameters of task 
subclass and workflow.
+All resource plugins need to inherit and override its abstract methods.
+
+Code
+----
+.. literalinclude:: ../../../src/pydolphinscheduler/core/resource_plugin.py
+   :start-after: [start resource_plugin_definition]
+   :end-before: [end resource_plugin_definition]
+
+Dive Into
+---------
+It has the following key functions.
+
+- Method `__init__`: The `__init__` function has STR type parameter `prefix`, 
which means the prefix of the resource.
+
+You can rewrite this function if necessary.
+
+.. literalinclude:: ../../../src/pydolphinscheduler/core/resource_plugin.py
+    :start-after: [start init_method]
+    :end-before: [end init_method]
+
+- Method `read_file`: Get content from the given URI, The function parameter 
is the suffix of the file path.
+
+The file prefix has been initialized in init of the resource plug-in.
+
+The prefix plus suffix is the absolute path of the file in this resource.
+
+It is an abstract function. You must rewrite it
+
+.. literalinclude:: ../../../src/pydolphinscheduler/core/resource_plugin.py
+    :start-after: [start abstractmethod read_file]
+    :end-before: [end abstractmethod read_file]
+
+.. automodule:: pydolphinscheduler.core.resource_plugin
+
+How to use
+----------
+Resource plug-ins can be used in task subclasses and workflows. You can use 
the resource plug-ins by adding the `resource_plugin` parameter when they are 
initialized.
+For example, local resource plug-ins, add `resource_plugin = Local("/tmp")`.
+
+The resource plug-ins we currently support is `local`.
+
+Here is an example.
+
+.. literalinclude:: 
../../../src/pydolphinscheduler/examples/tutorial_resource_plugin.py
+   :start-after: [start workflow_declare]
+   :end-before: [end task_declare]
+
+When the resource_plugin parameter is defined in both the task subclass and 
the workflow, the resource_plugin defined in the task subclass is used first.
+
+If the task subclass does not define resource_plugin, but the resource_plugin 
is defined in the workflow, the resource_plugin in the workflow is used.
+
+Of course, if neither the task subclass nor the workflow specifies 
resource_plugin, the command at this time will be executed as a script,
+
+in other words, we are forward compatible.
\ No newline at end of file
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index df05b01ede..62de7ed1b4 100644
--- 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -24,6 +24,7 @@ from typing import Any, Dict, List, Optional, Set
 from pydolphinscheduler import configuration
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.resource import Resource
+from pydolphinscheduler.core.resource_plugin import ResourcePlugin
 from pydolphinscheduler.exceptions import PyDSParamException, 
PyDSTaskNoFoundException
 from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import Base, Project, Tenant, User
@@ -111,6 +112,7 @@ class ProcessDefinition(Base):
         timeout: Optional[int] = 0,
         release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
         param: Optional[Dict] = None,
+        resource_plugin: Optional[ResourcePlugin] = None,
         resource_list: Optional[List[Resource]] = None,
     ):
         super().__init__(name, description)
@@ -134,6 +136,7 @@ class ProcessDefinition(Base):
         self._release_state = release_state
         self.param = param
         self.tasks: dict = {}
+        self.resource_plugin = resource_plugin
         # TODO how to fix circle import
         self._task_relations: set["TaskRelation"] = set()  # noqa: F821
         self._process_definition_code = None
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource_plugin.py
similarity index 50%
copy from 
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
copy to 
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource_plugin.py
index 4d70a58637..457a7c27b7 100644
--- 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource_plugin.py
@@ -15,28 +15,35 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Exceptions for pydolphinscheduler."""
+"""DolphinScheduler ResourcePlugin object."""
 
 
-class PyDSBaseException(Exception):
-    """Base exception for pydolphinscheduler."""
+from abc import ABCMeta, abstractmethod
 
 
-class PyDSParamException(PyDSBaseException):
-    """Exception for pydolphinscheduler parameter verify error."""
+# [start resource_plugin_definition]
+class ResourcePlugin(object, metaclass=ABCMeta):
+    """ResourcePlugin object, declare resource plugin for task and workflow to 
dolphinscheduler.
 
+    :param prefix: A string representing the prefix of ResourcePlugin.
 
-class PyDSTaskNoFoundException(PyDSBaseException):
-    """Exception for pydolphinscheduler workflow task no found error."""
+    """
 
+    # [start init_method]
+    def __init__(self, prefix: str, *args, **kwargs):
+        self.prefix = prefix
 
-class PyDSJavaGatewayException(PyDSBaseException):
-    """Exception for pydolphinscheduler Java gateway error."""
+    # [end init_method]
 
+    # [start abstractmethod read_file]
+    @abstractmethod
+    def read_file(self, suf: str):
+        """Get the content of the file.
 
-class PyDSProcessDefinitionNotAssignException(PyDSBaseException):
-    """Exception for pydolphinscheduler process definition not assign error."""
+        The address of the file is the prefix of the resource plugin plus the 
parameter suf.
+        """
 
+    # [end abstractmethod read_file]
 
-class PyDSConfException(PyDSBaseException):
-    """Exception for pydolphinscheduler configuration error."""
+
+# [end resource_plugin_definition]
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index d1a2eae9dd..93eb32352c 100644
--- 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -33,7 +33,8 @@ from pydolphinscheduler.core.process_definition import (
     ProcessDefinitionContext,
 )
 from pydolphinscheduler.core.resource import Resource
-from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.core.resource_plugin import ResourcePlugin
+from pydolphinscheduler.exceptions import PyDSParamException, 
PyResPluginException
 from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import Base
 
@@ -101,6 +102,9 @@ class Task(Base):
 
     _task_custom_attr: set = set()
 
+    ext: set = None
+    ext_attr: str = None
+
     DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
 
     def __init__(
@@ -124,6 +128,7 @@ class Task(Base):
         dependence: Optional[Dict] = None,
         wait_start_timeout: Optional[Dict] = None,
         condition_result: Optional[Dict] = None,
+        resource_plugin: Optional[ResourcePlugin] = None,
     ):
 
         super().__init__(name, description)
@@ -166,6 +171,8 @@ class Task(Base):
         self.dependence = dependence or {}
         self.wait_start_timeout = wait_start_timeout or {}
         self._condition_result = condition_result or 
self.DEFAULT_CONDITION_RESULT
+        self.resource_plugin = resource_plugin
+        self.get_content()
 
     @property
     def process_definition(self) -> Optional[ProcessDefinition]:
@@ -229,6 +236,44 @@ class Task(Base):
         custom_attr |= self._task_custom_attr
         return self.get_define_custom(custom_attr=custom_attr)
 
+    def get_plugin(self):
+        """Return the resource plug-in.
+
+        according to parameter resource_plugin and parameter
+        process_definition.resource_plugin.
+        """
+        if self.resource_plugin is None:
+            if self.process_definition.resource_plugin is not None:
+                return self.process_definition.resource_plugin
+            else:
+                raise PyResPluginException(
+                    "The execution command of this task is a file, but the 
resource plugin is empty"
+                )
+        else:
+            return self.resource_plugin
+
+    def get_content(self):
+        """Get the file content according to the resource plugin."""
+        if self.ext_attr is None and self.ext is None:
+            return
+
+        _ext_attr = getattr(self, self.ext_attr)
+
+        if _ext_attr is not None:
+            if _ext_attr.endswith(tuple(self.ext)):
+                res = self.get_plugin()
+                content = res.read_file(_ext_attr)
+                setattr(self, self.ext_attr.lstrip("_"), content)
+            else:
+                index = _ext_attr.rfind(".")
+                if index != -1:
+                    raise ValueError(
+                        "This task does not support files with suffix {}, only 
supports {}".format(
+                            _ext_attr[index:], ",".join(str(suf) for suf in 
self.ext)
+                        )
+                    )
+                setattr(self, self.ext_attr.lstrip("_"), _ext_attr)
+
     def __hash__(self):
         return hash(self.code)
 
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_resource_plugin.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_resource_plugin.py
new file mode 100644
index 0000000000..5b02022ee9
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_resource_plugin.py
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+r"""
+A tutorial example take you to experience pydolphinscheduler resource plugin.
+
+Resource plug-ins can be defined in workflows and tasks
+
+it will instantiate and run all the task it have.
+"""
+import os
+from pathlib import Path
+
+# [start tutorial_resource_plugin]
+# [start package_import]
+# Import ProcessDefinition object to define your workflow attributes
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+
+# Import task Shell object cause we would create some shell tasks later
+from pydolphinscheduler.resources_plugin.local import Local
+from pydolphinscheduler.tasks.shell import Shell
+
+# [end package_import]
+
+# [start workflow_declare]
+with ProcessDefinition(
+    name="tutorial_resource_plugin",
+    schedule="0 0 0 * * ? *",
+    start_time="2021-01-01",
+    tenant="tenant_exists",
+    resource_plugin=Local("/tmp"),
+) as process_definition:
+    # [end workflow_declare]
+    # [start task_declare]
+    file = "resource.sh"
+    path = Path("/tmp").joinpath(file)
+    with open(str(path), "w") as f:
+        f.write("echo tutorial resource plugin")
+    task_parent = Shell(
+        name="local-resource-example",
+        command=file,
+    )
+    print(task_parent.task_params)
+    os.remove(path)
+    # [end task_declare]
+
+    # [start submit_or_run]
+    process_definition.run()
+    # [end submit_or_run]
+# [end tutorial_resource_plugin]
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
index 4d70a58637..5b0d1bb61f 100644
--- 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
@@ -40,3 +40,7 @@ class 
PyDSProcessDefinitionNotAssignException(PyDSBaseException):
 
 class PyDSConfException(PyDSBaseException):
     """Exception for pydolphinscheduler configuration error."""
+
+
+class PyResPluginException(PyDSBaseException):
+    """Exception for pydolphinscheduler resource plugin error."""
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/__init__.py
similarity index 51%
copy from 
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
copy to 
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/__init__.py
index 4d70a58637..b6bc7a5ffb 100644
--- 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/__init__.py
@@ -15,28 +15,9 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Exceptions for pydolphinscheduler."""
+"""Init resources_plugin package."""
+from pydolphinscheduler.resources_plugin.local import Local
 
-
-class PyDSBaseException(Exception):
-    """Base exception for pydolphinscheduler."""
-
-
-class PyDSParamException(PyDSBaseException):
-    """Exception for pydolphinscheduler parameter verify error."""
-
-
-class PyDSTaskNoFoundException(PyDSBaseException):
-    """Exception for pydolphinscheduler workflow task no found error."""
-
-
-class PyDSJavaGatewayException(PyDSBaseException):
-    """Exception for pydolphinscheduler Java gateway error."""
-
-
-class PyDSProcessDefinitionNotAssignException(PyDSBaseException):
-    """Exception for pydolphinscheduler process definition not assign error."""
-
-
-class PyDSConfException(PyDSBaseException):
-    """Exception for pydolphinscheduler configuration error."""
+__all__ = [
+    "Local",
+]
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/local.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/local.py
new file mode 100644
index 0000000000..8a20ed9737
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/local.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""DolphinScheduler local resource plugin."""
+
+import os
+from pathlib import Path
+
+from pydolphinscheduler.core.resource_plugin import ResourcePlugin
+from pydolphinscheduler.exceptions import PyResPluginException
+
+
+class Local(ResourcePlugin):
+    """Local object, declare local resource plugin for task and workflow to 
dolphinscheduler.
+
+    :param prefix: A string representing the prefix of Local.
+
+    """
+
+    # [start init_method]
+    def __init__(self, prefix: str, *args, **kwargs):
+        super().__init__(prefix, *args, **kwargs)
+
+    # [end init_method]
+
+    # [start read_file_method]
+    def read_file(self, suf: str):
+        """Get the content of the file.
+
+        The address of the file is the prefix of the resource plugin plus the 
parameter suf.
+        """
+        path = Path(self.prefix).joinpath(suf)
+        if not path.exists():
+            raise PyResPluginException("{} is not found".format(str(path)))
+        if not os.access(str(path), os.R_OK):
+            raise PyResPluginException(
+                "You don't have permission to access {}".format(self.prefix + 
suf)
+            )
+        with open(path, "r") as f:
+            content = f.read()
+        return content
+
+    # [end read_file_method]
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
index 9a73535c8c..36ec4e87d0 100644
--- 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
@@ -50,6 +50,9 @@ class Shell(Task):
         "raw_script",
     }
 
+    ext: set = {".sh", ".zsh"}
+    ext_attr: str = "_raw_script"
+
     def __init__(self, name: str, command: str, *args, **kwargs):
+        self._raw_script = command
         super().__init__(name, TaskType.SHELL, *args, **kwargs)
-        self.raw_script = command
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py 
b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
index 3909077b98..cb06c21ae4 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
@@ -18,12 +18,14 @@
 """Test Task class function."""
 import logging
 import re
-from unittest.mock import patch
+from unittest.mock import PropertyMock, patch
 
 import pytest
 
 from pydolphinscheduler.core.process_definition import ProcessDefinition
 from pydolphinscheduler.core.task import Task, TaskRelation
+from pydolphinscheduler.exceptions import PyResPluginException
+from pydolphinscheduler.resources_plugin import Local
 from tests.testing.task import Task as testTask
 from tests.testing.task import TaskWithCode
 
@@ -252,6 +254,118 @@ def test_add_duplicate(caplog):
         )
 
 
[email protected](
+    "val, expected",
+    [
+        ("a.sh", "echo Test task attribute ext_attr"),
+        ("a.zsh", "echo Test task attribute ext_attr"),
+        ("echo Test task attribute ext_attr", "echo Test task attribute 
ext_attr"),
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.ext",
+    new_callable=PropertyMock,
+    return_value={".sh", ".zsh"},
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.ext_attr",
+    new_callable=PropertyMock,
+    return_value="_raw_script",
+)
+@patch(
+    "pydolphinscheduler.core.task.Task._raw_script",
+    create=True,
+    new_callable=PropertyMock,
+)
+@patch("pydolphinscheduler.core.task.Task.get_plugin")
+def test_task_ext_attr(
+    m_plugin, m_raw_script, m_ext_attr, m_ext, m_code_version, val, expected
+):
+    """Test task attribute ext_attr."""
+    m_plugin.return_value.read_file.return_value = expected
+    m_raw_script.return_value = val
+    task = Task("test_task_ext_attr", "test_task_ext_attr")
+    assert expected == getattr(task, "raw_script")
+
+
[email protected](
+    "attr, expected",
+    [
+        (
+            {
+                "name": "test_task_abtain_res_plugin",
+                "task_type": "TaskType",
+                "resource_plugin": Local("prefix"),
+                "process_definition": ProcessDefinition(
+                    name="process_definition",
+                    resource_plugin=Local("prefix"),
+                ),
+            },
+            "Local",
+        ),
+        (
+            {
+                "name": "test_task_abtain_res_plugin",
+                "task_type": "TaskType",
+                "resource_plugin": Local("prefix"),
+            },
+            "Local",
+        ),
+        (
+            {
+                "name": "test_task_abtain_res_plugin",
+                "task_type": "TaskType",
+                "process_definition": ProcessDefinition(
+                    name="process_definition",
+                    resource_plugin=Local("prefix"),
+                ),
+            },
+            "Local",
+        ),
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch("pydolphinscheduler.core.task.Task.get_content")
+def test_task_obtain_res_plugin(m_get_content, m_code_version, attr, expected):
+    """Test task obtaining resource plug-in."""
+    task = Task(**attr)
+    assert expected == task.get_plugin().__class__.__name__
+
+
[email protected](
+    "attr",
+    [
+        {
+            "name": "test_task_abtain_res_plugin",
+            "task_type": "TaskType",
+            "process_definition": ProcessDefinition(
+                name="process_definition",
+            ),
+        },
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch("pydolphinscheduler.core.task.Task.get_content")
+def test_task_obtain_res_plugin_exception(m_get_content, m_code_version, attr):
+    """Test task obtaining resource plug-in exception."""
+    with pytest.raises(
+        PyResPluginException,
+        match="The execution command of this task is a file, but the resource 
plugin is empty",
+    ):
+        task = Task(**attr)
+        task.get_plugin()
+
+
 @pytest.mark.parametrize(
     "resources, expect",
     [
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py 
b/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py
index 70f367767c..319ad961f7 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py
@@ -97,7 +97,11 @@ def test_example_basic():
         ), f"We expect all examples is python script, but get {ex.name}."
 
         # All except tutorial and __init__ is end with keyword "_example"
-        if ex.stem not in ("tutorial", "tutorial_decorator") and ex.stem != 
"__init__":
+        if (
+            ex.stem
+            not in ("tutorial", "tutorial_decorator", 
"tutorial_resource_plugin")
+            and ex.stem != "__init__"
+        ):
             assert ex.stem.endswith(
                 "_example"
             ), f"We expect all examples script end with keyword '_example', 
but get {ex.stem}."
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
 b/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/__init__.py
similarity index 51%
copy from 
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
copy to 
dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/__init__.py
index 4d70a58637..0b6bdf360b 100644
--- 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
+++ 
b/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/__init__.py
@@ -15,28 +15,4 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Exceptions for pydolphinscheduler."""
-
-
-class PyDSBaseException(Exception):
-    """Base exception for pydolphinscheduler."""
-
-
-class PyDSParamException(PyDSBaseException):
-    """Exception for pydolphinscheduler parameter verify error."""
-
-
-class PyDSTaskNoFoundException(PyDSBaseException):
-    """Exception for pydolphinscheduler workflow task no found error."""
-
-
-class PyDSJavaGatewayException(PyDSBaseException):
-    """Exception for pydolphinscheduler Java gateway error."""
-
-
-class PyDSProcessDefinitionNotAssignException(PyDSBaseException):
-    """Exception for pydolphinscheduler process definition not assign error."""
-
-
-class PyDSConfException(PyDSBaseException):
-    """Exception for pydolphinscheduler configuration error."""
+"""Init resources_plugin package tests."""
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_local.py
 
b/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_local.py
new file mode 100644
index 0000000000..82b196f75a
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_local.py
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test local resource plugin."""
+from pathlib import Path
+from unittest.mock import PropertyMock, patch
+
+import pytest
+
+from pydolphinscheduler.core import Task
+from pydolphinscheduler.exceptions import PyResPluginException
+from pydolphinscheduler.resources_plugin.local import Local
+from pydolphinscheduler.utils import file
+from tests.testing.file import delete_file
+
+file_name = "local_res.sh"
+file_content = "echo Test local res plugin"
+res_plugin_prefix = Path(__file__).parent
+file_path = res_plugin_prefix.joinpath(file_name)
+
+
[email protected]()
+def setup_crt_first():
+    """Set up and teardown about create file first and then delete it."""
+    file.write(content=file_content, to_path=file_path)
+    yield
+    delete_file(file_path)
+
+
[email protected](
+    "val, expected",
+    [
+        (file_name, file_content),
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.ext",
+    new_callable=PropertyMock,
+    return_value={
+        ".sh",
+    },
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.ext_attr",
+    new_callable=PropertyMock,
+    return_value="_raw_script",
+)
+@patch(
+    "pydolphinscheduler.core.task.Task._raw_script",
+    create=True,
+    new_callable=PropertyMock,
+)
+def test_task_obtain_res_plugin(
+    m_raw_script, m_ext_attr, m_ext, m_code_version, val, expected, 
setup_crt_first
+):
+    """Test task obtaining resource plug-in."""
+    m_raw_script.return_value = val
+    task = Task(
+        name="test_task_ext_attr",
+        task_type="type",
+        resource_plugin=Local(str(res_plugin_prefix)),
+    )
+    assert expected == getattr(task, "raw_script")
+
+
[email protected](
+    "attr, expected",
+    [({"prefix": res_plugin_prefix, "file_name": file_name}, file_content)],
+)
+def test_local_res_read_file(attr, expected, setup_crt_first):
+    """Test the read_file function of the local resource plug-in."""
+    local = Local(str(attr.get("prefix")))
+    local.read_file(attr.get("file_name"))
+    assert expected == local.read_file(file_name)
+
+
[email protected](
+    "attr",
+    [
+        {"prefix": res_plugin_prefix, "file_name": file_name},
+    ],
+)
+def test_local_res_file_not_found(attr):
+    """Test local resource plugin file does not exist."""
+    with pytest.raises(
+        PyResPluginException,
+        match=".* is not found",
+    ):
+        local = Local(str(attr.get("prefix")))
+        local.read_file(attr.get("file_name"))
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py 
b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
index e2c87d8e7d..9344ac2bb0 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
@@ -17,12 +17,28 @@
 
 """Test Task shell."""
 
-
+from pathlib import Path
 from unittest.mock import patch
 
 import pytest
 
+from pydolphinscheduler.resources_plugin import Local
 from pydolphinscheduler.tasks.shell import Shell
+from pydolphinscheduler.utils import file
+from tests.testing.file import delete_file
+
+file_name = "local_res.sh"
+file_content = 'echo "test res_local"'
+res_plugin_prefix = Path(__file__).parent
+file_path = res_plugin_prefix.joinpath(file_name)
+
+
[email protected]
+def setup_crt_first():
+    """Set up and teardown about create file first and then delete it."""
+    file.write(content=file_content, to_path=file_path)
+    yield
+    delete_file(file_path)
 
 
 @pytest.mark.parametrize(
@@ -90,3 +106,28 @@ def test_shell_get_define():
         shell = Shell(name, command)
         print(shell.get_define())
         assert shell.get_define() == expect
+
+
[email protected](
+    "attr, expect",
+    [
+        (
+            {
+                "name": "test-local-res-command-content",
+                "command": file_name,
+                "resource_plugin": Local(str(res_plugin_prefix)),
+            },
+            file_content,
+        )
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test_resources_local_shell_command_content(
+    mock_code_version, attr, expect, setup_crt_first
+):
+    """Test task shell task command content through the local resource 
plug-in."""
+    task = Shell(**attr)
+    assert expect == getattr(task, "raw_script")

Reply via email to