This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 6d460a3ca2 [DSIP-13][python] New mechanism file plugins to Python API
(#11360)
6d460a3ca2 is described below
commit 6d460a3ca2a08f524100100f49f8667a1f0093cb
Author: chenrj <[email protected]>
AuthorDate: Wed Sep 7 10:02:37 2022 +0800
[DSIP-13][python] New mechanism file plugins to Python API (#11360)
---
.../pydolphinscheduler/docs/source/index.rst | 1 +
.../docs/source/resources_plugin/develop.rst | 46 ++++++++
.../docs/source/{ => resources_plugin}/index.rst | 31 ++----
.../{index.rst => resources_plugin/local.rst} | 31 ++----
.../source/resources_plugin/resource-plugin.rst | 75 +++++++++++++
.../pydolphinscheduler/core/process_definition.py | 3 +
.../{exceptions.py => core/resource_plugin.py} | 33 +++---
.../src/pydolphinscheduler/core/task.py | 47 ++++++++-
.../examples/tutorial_resource_plugin.py | 64 ++++++++++++
.../src/pydolphinscheduler/exceptions.py | 4 +
.../__init__.py} | 29 +-----
.../pydolphinscheduler/resources_plugin/local.py | 57 ++++++++++
.../src/pydolphinscheduler/tasks/shell.py | 5 +-
.../pydolphinscheduler/tests/core/test_task.py | 116 ++++++++++++++++++++-
.../tests/example/test_example.py | 6 +-
.../resources_plugin/__init__.py} | 26 +----
.../tests/resources_plugin/test_local.py | 108 +++++++++++++++++++
.../pydolphinscheduler/tests/tasks/test_shell.py | 43 +++++++-
18 files changed, 612 insertions(+), 113 deletions(-)
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
b/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
index 24ad107ad6..4dc0a949c9 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
@@ -36,6 +36,7 @@ then go and see :doc:`tutorial` for more detail.
cli
config
api
+ resources_plugin/index
Indices and tables
==================
diff --git
a/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/develop.rst
b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/develop.rst
new file mode 100644
index 0000000000..9e112b240a
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/develop.rst
@@ -0,0 +1,46 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+.. http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+How to develop
+==============
+
+When you want to create a new resource plugin, you need to add a new class in
the module `resources_plugin`.
+
+The resource plug-in class needs to inherit the abstract class
`ResourcePlugin` and implement its abstract method `read_file` function.
+
+The parameter of the `__init__` function of `ResourcePlugin` is the prefix of
STR type. You can override this function when necessary.
+
+The `read_file` function parameter of `ResourcePlugin` is the file suffix of
STR type, and its return value is the file content, if it exists and is
readable.
+
+
+Example
+-------
+- Method `__init__`: Initiation method with `param`:`prefix`
+
+.. literalinclude:: ../../../src/pydolphinscheduler/resources_plugin/local.py
+ :start-after: [start init_method]
+ :end-before: [end init_method]
+
+- Method `read_file`: Get content from the given URI, The function parameter
is the suffix of the file path.
+
+The file prefix has been initialized in init of the resource plug-in.
+
+The prefix plus suffix is the absolute path of the file in this resource.
+
+.. literalinclude:: ../../../src/pydolphinscheduler/resources_plugin/local.py
+ :start-after: [start read_file_method]
+ :end-before: [end read_file_method]
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/index.rst
similarity index 58%
copy from dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
copy to
dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/index.rst
index 24ad107ad6..05a7ebd94a 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
+++
b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/index.rst
@@ -15,31 +15,14 @@
specific language governing permissions and limitations
under the License.
-PyDolphinScheduler
-==================
-
-**PyDolphinScheduler** is Python API for `Apache DolphinScheduler
<https://dolphinscheduler.apache.org>`_,
-which allow you definition your workflow by Python code, aka workflow-as-codes.
-
-I could go and find how to :ref:`install <start:getting started>` the project.
Or if you want to see simply example
-then go and see :doc:`tutorial` for more detail.
+Resources_plugin
+================
+In this section
.. toctree::
- :maxdepth: 2
-
- start
- tutorial
- concept
- tasks/index
- howto/index
- cli
- config
- api
-
-Indices and tables
-==================
+ :maxdepth: 1
-* :ref:`genindex`
-* :ref:`modindex`
-* :ref:`search`
+ develop
+ resource-plugin
+ local
\ No newline at end of file
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/local.rst
similarity index 57%
copy from dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
copy to
dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/local.rst
index 24ad107ad6..5da025a5c7 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
+++
b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/local.rst
@@ -15,31 +15,18 @@
specific language governing permissions and limitations
under the License.
-PyDolphinScheduler
-==================
+Local
+=====
-**PyDolphinScheduler** is Python API for `Apache DolphinScheduler
<https://dolphinscheduler.apache.org>`_,
-which allow you definition your workflow by Python code, aka workflow-as-codes.
+`Local` is a local resource plugin for pydolphinscheduler.
-I could go and find how to :ref:`install <start:getting started>` the project.
Or if you want to see simply example
-then go and see :doc:`tutorial` for more detail.
+When using a local resource plugin, you only need to add the `resource_plugin`
parameter in the task subclass or workflow definition,
+such as `resource_plugin=Local("/tmp")`.
-.. toctree::
- :maxdepth: 2
+For the specific use of resource plugins, you can see `How to use` in
:doc:`./resource-plugin`
- start
- tutorial
- concept
- tasks/index
- howto/index
- cli
- config
- api
+Dive Into
+---------
-Indices and tables
-==================
-
-* :ref:`genindex`
-* :ref:`modindex`
-* :ref:`search`
+.. automodule:: pydolphinscheduler.resources_plugin.local
\ No newline at end of file
diff --git
a/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/resource-plugin.rst
b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/resource-plugin.rst
new file mode 100644
index 0000000000..0b90eeecbf
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/resource-plugin.rst
@@ -0,0 +1,75 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+.. http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+ResourcePlugin
+==============
+
+`ResourcePlugin` is an abstract class of resource plug-in parameters of task
subclass and workflow.
+All resource plugins need to inherit and override its abstract methods.
+
+Code
+----
+.. literalinclude:: ../../../src/pydolphinscheduler/core/resource_plugin.py
+ :start-after: [start resource_plugin_definition]
+ :end-before: [end resource_plugin_definition]
+
+Dive Into
+---------
+It has the following key functions.
+
+- Method `__init__`: The `__init__` function has STR type parameter `prefix`,
which means the prefix of the resource.
+
+You can rewrite this function if necessary.
+
+.. literalinclude:: ../../../src/pydolphinscheduler/core/resource_plugin.py
+ :start-after: [start init_method]
+ :end-before: [end init_method]
+
+- Method `read_file`: Get content from the given URI, The function parameter
is the suffix of the file path.
+
+The file prefix has been initialized in init of the resource plug-in.
+
+The prefix plus suffix is the absolute path of the file in this resource.
+
+It is an abstract function. You must rewrite it
+
+.. literalinclude:: ../../../src/pydolphinscheduler/core/resource_plugin.py
+ :start-after: [start abstractmethod read_file]
+ :end-before: [end abstractmethod read_file]
+
+.. automodule:: pydolphinscheduler.core.resource_plugin
+
+How to use
+----------
+Resource plug-ins can be used in task subclasses and workflows. You can use
the resource plug-ins by adding the `resource_plugin` parameter when they are
initialized.
+For example, local resource plug-ins, add `resource_plugin = Local("/tmp")`.
+
+The resource plug-ins we currently support is `local`.
+
+Here is an example.
+
+.. literalinclude::
../../../src/pydolphinscheduler/examples/tutorial_resource_plugin.py
+ :start-after: [start workflow_declare]
+ :end-before: [end task_declare]
+
+When the resource_plugin parameter is defined in both the task subclass and
the workflow, the resource_plugin defined in the task subclass is used first.
+
+If the task subclass does not define resource_plugin, but the resource_plugin
is defined in the workflow, the resource_plugin in the workflow is used.
+
+Of course, if neither the task subclass nor the workflow specifies
resource_plugin, the command at this time will be executed as a script,
+
+in other words, we are forward compatible.
\ No newline at end of file
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index df05b01ede..62de7ed1b4 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -24,6 +24,7 @@ from typing import Any, Dict, List, Optional, Set
from pydolphinscheduler import configuration
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.resource import Resource
+from pydolphinscheduler.core.resource_plugin import ResourcePlugin
from pydolphinscheduler.exceptions import PyDSParamException,
PyDSTaskNoFoundException
from pydolphinscheduler.java_gateway import JavaGate
from pydolphinscheduler.models import Base, Project, Tenant, User
@@ -111,6 +112,7 @@ class ProcessDefinition(Base):
timeout: Optional[int] = 0,
release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
param: Optional[Dict] = None,
+ resource_plugin: Optional[ResourcePlugin] = None,
resource_list: Optional[List[Resource]] = None,
):
super().__init__(name, description)
@@ -134,6 +136,7 @@ class ProcessDefinition(Base):
self._release_state = release_state
self.param = param
self.tasks: dict = {}
+ self.resource_plugin = resource_plugin
# TODO how to fix circle import
self._task_relations: set["TaskRelation"] = set() # noqa: F821
self._process_definition_code = None
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource_plugin.py
similarity index 50%
copy from
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
copy to
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource_plugin.py
index 4d70a58637..457a7c27b7 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource_plugin.py
@@ -15,28 +15,35 @@
# specific language governing permissions and limitations
# under the License.
-"""Exceptions for pydolphinscheduler."""
+"""DolphinScheduler ResourcePlugin object."""
-class PyDSBaseException(Exception):
- """Base exception for pydolphinscheduler."""
+from abc import ABCMeta, abstractmethod
-class PyDSParamException(PyDSBaseException):
- """Exception for pydolphinscheduler parameter verify error."""
+# [start resource_plugin_definition]
+class ResourcePlugin(object, metaclass=ABCMeta):
+ """ResourcePlugin object, declare resource plugin for task and workflow to
dolphinscheduler.
+ :param prefix: A string representing the prefix of ResourcePlugin.
-class PyDSTaskNoFoundException(PyDSBaseException):
- """Exception for pydolphinscheduler workflow task no found error."""
+ """
+ # [start init_method]
+ def __init__(self, prefix: str, *args, **kwargs):
+ self.prefix = prefix
-class PyDSJavaGatewayException(PyDSBaseException):
- """Exception for pydolphinscheduler Java gateway error."""
+ # [end init_method]
+ # [start abstractmethod read_file]
+ @abstractmethod
+ def read_file(self, suf: str):
+ """Get the content of the file.
-class PyDSProcessDefinitionNotAssignException(PyDSBaseException):
- """Exception for pydolphinscheduler process definition not assign error."""
+ The address of the file is the prefix of the resource plugin plus the
parameter suf.
+ """
+ # [end abstractmethod read_file]
-class PyDSConfException(PyDSBaseException):
- """Exception for pydolphinscheduler configuration error."""
+
+# [end resource_plugin_definition]
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index d1a2eae9dd..93eb32352c 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -33,7 +33,8 @@ from pydolphinscheduler.core.process_definition import (
ProcessDefinitionContext,
)
from pydolphinscheduler.core.resource import Resource
-from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.core.resource_plugin import ResourcePlugin
+from pydolphinscheduler.exceptions import PyDSParamException,
PyResPluginException
from pydolphinscheduler.java_gateway import JavaGate
from pydolphinscheduler.models import Base
@@ -101,6 +102,9 @@ class Task(Base):
_task_custom_attr: set = set()
+ ext: set = None
+ ext_attr: str = None
+
DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
def __init__(
@@ -124,6 +128,7 @@ class Task(Base):
dependence: Optional[Dict] = None,
wait_start_timeout: Optional[Dict] = None,
condition_result: Optional[Dict] = None,
+ resource_plugin: Optional[ResourcePlugin] = None,
):
super().__init__(name, description)
@@ -166,6 +171,8 @@ class Task(Base):
self.dependence = dependence or {}
self.wait_start_timeout = wait_start_timeout or {}
self._condition_result = condition_result or
self.DEFAULT_CONDITION_RESULT
+ self.resource_plugin = resource_plugin
+ self.get_content()
@property
def process_definition(self) -> Optional[ProcessDefinition]:
@@ -229,6 +236,44 @@ class Task(Base):
custom_attr |= self._task_custom_attr
return self.get_define_custom(custom_attr=custom_attr)
+ def get_plugin(self):
+ """Return the resource plug-in.
+
+ according to parameter resource_plugin and parameter
+ process_definition.resource_plugin.
+ """
+ if self.resource_plugin is None:
+ if self.process_definition.resource_plugin is not None:
+ return self.process_definition.resource_plugin
+ else:
+ raise PyResPluginException(
+ "The execution command of this task is a file, but the
resource plugin is empty"
+ )
+ else:
+ return self.resource_plugin
+
+ def get_content(self):
+ """Get the file content according to the resource plugin."""
+ if self.ext_attr is None and self.ext is None:
+ return
+
+ _ext_attr = getattr(self, self.ext_attr)
+
+ if _ext_attr is not None:
+ if _ext_attr.endswith(tuple(self.ext)):
+ res = self.get_plugin()
+ content = res.read_file(_ext_attr)
+ setattr(self, self.ext_attr.lstrip("_"), content)
+ else:
+ index = _ext_attr.rfind(".")
+ if index != -1:
+ raise ValueError(
+ "This task does not support files with suffix {}, only
supports {}".format(
+ _ext_attr[index:], ",".join(str(suf) for suf in
self.ext)
+ )
+ )
+ setattr(self, self.ext_attr.lstrip("_"), _ext_attr)
+
def __hash__(self):
return hash(self.code)
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_resource_plugin.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_resource_plugin.py
new file mode 100644
index 0000000000..5b02022ee9
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_resource_plugin.py
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+r"""
+A tutorial example take you to experience pydolphinscheduler resource plugin.
+
+Resource plug-ins can be defined in workflows and tasks
+
+it will instantiate and run all the task it have.
+"""
+import os
+from pathlib import Path
+
+# [start tutorial_resource_plugin]
+# [start package_import]
+# Import ProcessDefinition object to define your workflow attributes
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+
+# Import task Shell object cause we would create some shell tasks later
+from pydolphinscheduler.resources_plugin.local import Local
+from pydolphinscheduler.tasks.shell import Shell
+
+# [end package_import]
+
+# [start workflow_declare]
+with ProcessDefinition(
+ name="tutorial_resource_plugin",
+ schedule="0 0 0 * * ? *",
+ start_time="2021-01-01",
+ tenant="tenant_exists",
+ resource_plugin=Local("/tmp"),
+) as process_definition:
+ # [end workflow_declare]
+ # [start task_declare]
+ file = "resource.sh"
+ path = Path("/tmp").joinpath(file)
+ with open(str(path), "w") as f:
+ f.write("echo tutorial resource plugin")
+ task_parent = Shell(
+ name="local-resource-example",
+ command=file,
+ )
+ print(task_parent.task_params)
+ os.remove(path)
+ # [end task_declare]
+
+ # [start submit_or_run]
+ process_definition.run()
+ # [end submit_or_run]
+# [end tutorial_resource_plugin]
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
index 4d70a58637..5b0d1bb61f 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
@@ -40,3 +40,7 @@ class
PyDSProcessDefinitionNotAssignException(PyDSBaseException):
class PyDSConfException(PyDSBaseException):
"""Exception for pydolphinscheduler configuration error."""
+
+
+class PyResPluginException(PyDSBaseException):
+ """Exception for pydolphinscheduler resource plugin error."""
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/__init__.py
similarity index 51%
copy from
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
copy to
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/__init__.py
index 4d70a58637..b6bc7a5ffb 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/__init__.py
@@ -15,28 +15,9 @@
# specific language governing permissions and limitations
# under the License.
-"""Exceptions for pydolphinscheduler."""
+"""Init resources_plugin package."""
+from pydolphinscheduler.resources_plugin.local import Local
-
-class PyDSBaseException(Exception):
- """Base exception for pydolphinscheduler."""
-
-
-class PyDSParamException(PyDSBaseException):
- """Exception for pydolphinscheduler parameter verify error."""
-
-
-class PyDSTaskNoFoundException(PyDSBaseException):
- """Exception for pydolphinscheduler workflow task no found error."""
-
-
-class PyDSJavaGatewayException(PyDSBaseException):
- """Exception for pydolphinscheduler Java gateway error."""
-
-
-class PyDSProcessDefinitionNotAssignException(PyDSBaseException):
- """Exception for pydolphinscheduler process definition not assign error."""
-
-
-class PyDSConfException(PyDSBaseException):
- """Exception for pydolphinscheduler configuration error."""
+__all__ = [
+ "Local",
+]
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/local.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/local.py
new file mode 100644
index 0000000000..8a20ed9737
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/local.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""DolphinScheduler local resource plugin."""
+
+import os
+from pathlib import Path
+
+from pydolphinscheduler.core.resource_plugin import ResourcePlugin
+from pydolphinscheduler.exceptions import PyResPluginException
+
+
+class Local(ResourcePlugin):
+ """Local object, declare local resource plugin for task and workflow to
dolphinscheduler.
+
+ :param prefix: A string representing the prefix of Local.
+
+ """
+
+ # [start init_method]
+ def __init__(self, prefix: str, *args, **kwargs):
+ super().__init__(prefix, *args, **kwargs)
+
+ # [end init_method]
+
+ # [start read_file_method]
+ def read_file(self, suf: str):
+ """Get the content of the file.
+
+ The address of the file is the prefix of the resource plugin plus the
parameter suf.
+ """
+ path = Path(self.prefix).joinpath(suf)
+ if not path.exists():
+ raise PyResPluginException("{} is not found".format(str(path)))
+ if not os.access(str(path), os.R_OK):
+ raise PyResPluginException(
+ "You don't have permission to access {}".format(self.prefix +
suf)
+ )
+ with open(path, "r") as f:
+ content = f.read()
+ return content
+
+ # [end read_file_method]
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
index 9a73535c8c..36ec4e87d0 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
@@ -50,6 +50,9 @@ class Shell(Task):
"raw_script",
}
+ ext: set = {".sh", ".zsh"}
+ ext_attr: str = "_raw_script"
+
def __init__(self, name: str, command: str, *args, **kwargs):
+ self._raw_script = command
super().__init__(name, TaskType.SHELL, *args, **kwargs)
- self.raw_script = command
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
index 3909077b98..cb06c21ae4 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
@@ -18,12 +18,14 @@
"""Test Task class function."""
import logging
import re
-from unittest.mock import patch
+from unittest.mock import PropertyMock, patch
import pytest
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.task import Task, TaskRelation
+from pydolphinscheduler.exceptions import PyResPluginException
+from pydolphinscheduler.resources_plugin import Local
from tests.testing.task import Task as testTask
from tests.testing.task import TaskWithCode
@@ -252,6 +254,118 @@ def test_add_duplicate(caplog):
)
[email protected](
+ "val, expected",
+ [
+ ("a.sh", "echo Test task attribute ext_attr"),
+ ("a.zsh", "echo Test task attribute ext_attr"),
+ ("echo Test task attribute ext_attr", "echo Test task attribute
ext_attr"),
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.ext",
+ new_callable=PropertyMock,
+ return_value={".sh", ".zsh"},
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.ext_attr",
+ new_callable=PropertyMock,
+ return_value="_raw_script",
+)
+@patch(
+ "pydolphinscheduler.core.task.Task._raw_script",
+ create=True,
+ new_callable=PropertyMock,
+)
+@patch("pydolphinscheduler.core.task.Task.get_plugin")
+def test_task_ext_attr(
+ m_plugin, m_raw_script, m_ext_attr, m_ext, m_code_version, val, expected
+):
+ """Test task attribute ext_attr."""
+ m_plugin.return_value.read_file.return_value = expected
+ m_raw_script.return_value = val
+ task = Task("test_task_ext_attr", "test_task_ext_attr")
+ assert expected == getattr(task, "raw_script")
+
+
[email protected](
+ "attr, expected",
+ [
+ (
+ {
+ "name": "test_task_abtain_res_plugin",
+ "task_type": "TaskType",
+ "resource_plugin": Local("prefix"),
+ "process_definition": ProcessDefinition(
+ name="process_definition",
+ resource_plugin=Local("prefix"),
+ ),
+ },
+ "Local",
+ ),
+ (
+ {
+ "name": "test_task_abtain_res_plugin",
+ "task_type": "TaskType",
+ "resource_plugin": Local("prefix"),
+ },
+ "Local",
+ ),
+ (
+ {
+ "name": "test_task_abtain_res_plugin",
+ "task_type": "TaskType",
+ "process_definition": ProcessDefinition(
+ name="process_definition",
+ resource_plugin=Local("prefix"),
+ ),
+ },
+ "Local",
+ ),
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+@patch("pydolphinscheduler.core.task.Task.get_content")
+def test_task_obtain_res_plugin(m_get_content, m_code_version, attr, expected):
+ """Test task obtaining resource plug-in."""
+ task = Task(**attr)
+ assert expected == task.get_plugin().__class__.__name__
+
+
[email protected](
+ "attr",
+ [
+ {
+ "name": "test_task_abtain_res_plugin",
+ "task_type": "TaskType",
+ "process_definition": ProcessDefinition(
+ name="process_definition",
+ ),
+ },
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+@patch("pydolphinscheduler.core.task.Task.get_content")
+def test_task_obtain_res_plugin_exception(m_get_content, m_code_version, attr):
+ """Test task obtaining resource plug-in exception."""
+ with pytest.raises(
+ PyResPluginException,
+ match="The execution command of this task is a file, but the resource
plugin is empty",
+ ):
+ task = Task(**attr)
+ task.get_plugin()
+
+
@pytest.mark.parametrize(
"resources, expect",
[
diff --git
a/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py
b/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py
index 70f367767c..319ad961f7 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py
@@ -97,7 +97,11 @@ def test_example_basic():
), f"We expect all examples is python script, but get {ex.name}."
# All except tutorial and __init__ is end with keyword "_example"
- if ex.stem not in ("tutorial", "tutorial_decorator") and ex.stem !=
"__init__":
+ if (
+ ex.stem
+ not in ("tutorial", "tutorial_decorator",
"tutorial_resource_plugin")
+ and ex.stem != "__init__"
+ ):
assert ex.stem.endswith(
"_example"
), f"We expect all examples script end with keyword '_example',
but get {ex.stem}."
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
b/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/__init__.py
similarity index 51%
copy from
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
copy to
dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/__init__.py
index 4d70a58637..0b6bdf360b 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
+++
b/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/__init__.py
@@ -15,28 +15,4 @@
# specific language governing permissions and limitations
# under the License.
-"""Exceptions for pydolphinscheduler."""
-
-
-class PyDSBaseException(Exception):
- """Base exception for pydolphinscheduler."""
-
-
-class PyDSParamException(PyDSBaseException):
- """Exception for pydolphinscheduler parameter verify error."""
-
-
-class PyDSTaskNoFoundException(PyDSBaseException):
- """Exception for pydolphinscheduler workflow task no found error."""
-
-
-class PyDSJavaGatewayException(PyDSBaseException):
- """Exception for pydolphinscheduler Java gateway error."""
-
-
-class PyDSProcessDefinitionNotAssignException(PyDSBaseException):
- """Exception for pydolphinscheduler process definition not assign error."""
-
-
-class PyDSConfException(PyDSBaseException):
- """Exception for pydolphinscheduler configuration error."""
+"""Init resources_plugin package tests."""
diff --git
a/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_local.py
b/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_local.py
new file mode 100644
index 0000000000..82b196f75a
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_local.py
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test local resource plugin."""
+from pathlib import Path
+from unittest.mock import PropertyMock, patch
+
+import pytest
+
+from pydolphinscheduler.core import Task
+from pydolphinscheduler.exceptions import PyResPluginException
+from pydolphinscheduler.resources_plugin.local import Local
+from pydolphinscheduler.utils import file
+from tests.testing.file import delete_file
+
+file_name = "local_res.sh"
+file_content = "echo Test local res plugin"
+res_plugin_prefix = Path(__file__).parent
+file_path = res_plugin_prefix.joinpath(file_name)
+
+
[email protected]()
+def setup_crt_first():
+ """Set up and teardown about create file first and then delete it."""
+ file.write(content=file_content, to_path=file_path)
+ yield
+ delete_file(file_path)
+
+
[email protected](
+ "val, expected",
+ [
+ (file_name, file_content),
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.ext",
+ new_callable=PropertyMock,
+ return_value={
+ ".sh",
+ },
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.ext_attr",
+ new_callable=PropertyMock,
+ return_value="_raw_script",
+)
+@patch(
+ "pydolphinscheduler.core.task.Task._raw_script",
+ create=True,
+ new_callable=PropertyMock,
+)
+def test_task_obtain_res_plugin(
+ m_raw_script, m_ext_attr, m_ext, m_code_version, val, expected,
setup_crt_first
+):
+ """Test task obtaining resource plug-in."""
+ m_raw_script.return_value = val
+ task = Task(
+ name="test_task_ext_attr",
+ task_type="type",
+ resource_plugin=Local(str(res_plugin_prefix)),
+ )
+ assert expected == getattr(task, "raw_script")
+
+
[email protected](
+ "attr, expected",
+ [({"prefix": res_plugin_prefix, "file_name": file_name}, file_content)],
+)
+def test_local_res_read_file(attr, expected, setup_crt_first):
+ """Test the read_file function of the local resource plug-in."""
+ local = Local(str(attr.get("prefix")))
+ local.read_file(attr.get("file_name"))
+ assert expected == local.read_file(file_name)
+
+
[email protected](
+ "attr",
+ [
+ {"prefix": res_plugin_prefix, "file_name": file_name},
+ ],
+)
+def test_local_res_file_not_found(attr):
+ """Test local resource plugin file does not exist."""
+ with pytest.raises(
+ PyResPluginException,
+ match=".* is not found",
+ ):
+ local = Local(str(attr.get("prefix")))
+ local.read_file(attr.get("file_name"))
diff --git
a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
index e2c87d8e7d..9344ac2bb0 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
@@ -17,12 +17,28 @@
"""Test Task shell."""
-
+from pathlib import Path
from unittest.mock import patch
import pytest
+from pydolphinscheduler.resources_plugin import Local
from pydolphinscheduler.tasks.shell import Shell
+from pydolphinscheduler.utils import file
+from tests.testing.file import delete_file
+
+file_name = "local_res.sh"
+file_content = 'echo "test res_local"'
+res_plugin_prefix = Path(__file__).parent
+file_path = res_plugin_prefix.joinpath(file_name)
+
+
[email protected]
+def setup_crt_first():
+ """Set up and teardown about create file first and then delete it."""
+ file.write(content=file_content, to_path=file_path)
+ yield
+ delete_file(file_path)
@pytest.mark.parametrize(
@@ -90,3 +106,28 @@ def test_shell_get_define():
shell = Shell(name, command)
print(shell.get_define())
assert shell.get_define() == expect
+
+
[email protected](
+ "attr, expect",
+ [
+ (
+ {
+ "name": "test-local-res-command-content",
+ "command": file_name,
+ "resource_plugin": Local(str(res_plugin_prefix)),
+ },
+ file_content,
+ )
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test_resources_local_shell_command_content(
+ mock_code_version, attr, expect, setup_crt_first
+):
+ """Test task shell task command content through the local resource
plug-in."""
+ task = Shell(**attr)
+ assert expect == getattr(task, "raw_script")