This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 9652964c94 [python] Add resource plugin for python, dataX, CustomDataX
and Sql (#12135)
9652964c94 is described below
commit 9652964c946f06de315211d551babf3c6e8dc159
Author: chenrj <[email protected]>
AuthorDate: Thu Oct 13 19:14:54 2022 +0800
[python] Add resource plugin for python, dataX, CustomDataX and Sql (#12135)
---
.../src/pydolphinscheduler/constants.py | 2 +
.../src/pydolphinscheduler/core/task.py | 29 ++++---
.../src/pydolphinscheduler/tasks/datax.py | 10 ++-
.../src/pydolphinscheduler/tasks/python.py | 33 ++++----
.../src/pydolphinscheduler/tasks/sql.py | 7 +-
.../pydolphinscheduler/tests/tasks/test_datax.py | 91 +++++++++++++++++++++-
.../pydolphinscheduler/tests/tasks/test_python.py | 58 +++++++++++++-
.../pydolphinscheduler/tests/tasks/test_sql.py | 45 ++++++++++-
8 files changed, 239 insertions(+), 36 deletions(-)
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index de5ce26002..bedbbf2f5e 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -118,3 +118,5 @@ class Symbol(str):
SLASH = "/"
POINT = "."
+ COMMA = ","
+ UNDERLINE = "_"
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index 26b71c186d..3fec31fd67 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -17,6 +17,7 @@
"""DolphinScheduler Task and TaskRelation object."""
import copy
+import types
from logging import getLogger
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
@@ -24,6 +25,7 @@ from pydolphinscheduler import configuration
from pydolphinscheduler.constants import (
Delimiter,
ResourceKey,
+ Symbol,
TaskFlag,
TaskPriority,
TaskTimeoutFlag,
@@ -114,7 +116,7 @@ class Task(Base):
_task_custom_attr: set = set()
ext: set = None
- ext_attr: str = None
+ ext_attr: Union[str, types.FunctionType] = None
DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
@@ -271,23 +273,26 @@ class Task(Base):
"""Get the file content according to the resource plugin."""
if self.ext_attr is None and self.ext is None:
return
-
_ext_attr = getattr(self, self.ext_attr)
-
if _ext_attr is not None:
- if _ext_attr.endswith(tuple(self.ext)):
+ if isinstance(_ext_attr, str) and
_ext_attr.endswith(tuple(self.ext)):
res = self.get_plugin()
content = res.read_file(_ext_attr)
- setattr(self, self.ext_attr.lstrip("_"), content)
+ setattr(self, self.ext_attr.lstrip(Symbol.UNDERLINE), content)
else:
- index = _ext_attr.rfind(".")
- if index != -1:
- raise ValueError(
- "This task does not support files with suffix {}, only
supports {}".format(
- _ext_attr[index:], ",".join(str(suf) for suf in
self.ext)
+ if self.resource_plugin is not None or (
+ self.process_definition is not None
+ and self.process_definition.resource_plugin is not None
+ ):
+ index = _ext_attr.rfind(Symbol.POINT)
+ if index != -1:
+ raise ValueError(
+ "This task does not support files with suffix {},
only supports {}".format(
+ _ext_attr[index:],
+ Symbol.COMMA.join(str(suf) for suf in
self.ext),
+ )
)
- )
- setattr(self, self.ext_attr.lstrip("_"), _ext_attr)
+ setattr(self, self.ext_attr.lstrip(Symbol.UNDERLINE),
_ext_attr)
def __hash__(self):
return hash(self.code)
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/datax.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/datax.py
index f881a67de9..945f7824e4 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/datax.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/datax.py
@@ -34,6 +34,9 @@ class CustomDataX(Task):
_task_custom_attr = {"custom_config", "json", "xms", "xmx"}
+ ext: set = {".json"}
+ ext_attr: str = "_json"
+
def __init__(
self,
name: str,
@@ -43,9 +46,9 @@ class CustomDataX(Task):
*args,
**kwargs
):
+ self._json = json
super().__init__(name, TaskType.DATAX, *args, **kwargs)
self.custom_config = self.CUSTOM_CONFIG
- self.json = json
self.xms = xms
self.xmx = xmx
@@ -76,6 +79,9 @@ class DataX(Task):
"xmx",
}
+ ext: set = {".sql"}
+ ext_attr: str = "_sql"
+
def __init__(
self,
name: str,
@@ -92,8 +98,8 @@ class DataX(Task):
*args,
**kwargs
):
+ self._sql = sql
super().__init__(name, TaskType.DATAX, *args, **kwargs)
- self.sql = sql
self.custom_config = self.CUSTOM_CONFIG
self.datasource_name = datasource_name
self.datatarget_name = datatarget_name
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
index 52903d48d9..de16a9931c 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
@@ -55,15 +55,16 @@ class Python(Task):
want to execute.
"""
- _task_custom_attr = {
- "raw_script",
- }
+ _task_custom_attr = {"raw_script", "definition"}
+
+ ext: set = {".py"}
+ ext_attr: Union[str, types.FunctionType] = "_definition"
def __init__(
self, name: str, definition: Union[str, types.FunctionType], *args,
**kwargs
):
+ self._definition = definition
super().__init__(name, TaskType.PYTHON, *args, **kwargs)
- self.definition = definition
def _build_exe_str(self) -> str:
"""Build executable string from given definition.
@@ -71,32 +72,34 @@ class Python(Task):
Attribute ``self.definition`` almost is a function, we need to call
this function after parsing it
to string. The easier way to call a function is using syntax
``func()`` and we use it to call it too.
"""
- if isinstance(self.definition, types.FunctionType):
- py_function = inspect.getsource(self.definition)
- func_str = f"{py_function}{self.definition.__name__}()"
+ definition = getattr(self, "definition")
+ if isinstance(definition, types.FunctionType):
+ py_function = inspect.getsource(definition)
+ func_str = f"{py_function}{definition.__name__}()"
else:
pattern = re.compile("^def (\\w+)\\(")
- find = pattern.findall(self.definition)
+ find = pattern.findall(definition)
if not find:
log.warning(
"Python definition is simple script instead of function,
with value %s",
- self.definition,
+ definition,
)
- return self.definition
+ return definition
# Keep function str and function callable always have one blank
line
func_str = (
- f"{self.definition}{find[0]}()"
- if self.definition.endswith("\n")
- else f"{self.definition}\n{find[0]}()"
+ f"{definition}{find[0]}()"
+ if definition.endswith("\n")
+ else f"{definition}\n{find[0]}()"
)
return func_str
@property
def raw_script(self) -> str:
"""Get python task define attribute `raw_script`."""
- if isinstance(self.definition, (str, types.FunctionType)):
+ if isinstance(getattr(self, "definition"), (str, types.FunctionType)):
return self._build_exe_str()
else:
raise PyDSParamException(
- "Parameter definition do not support % for now.",
type(self.definition)
+ "Parameter definition do not support % for now.",
+ type(getattr(self, "definition")),
)
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
index 716a024daf..4bebf8379d 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
@@ -59,6 +59,9 @@ class Sql(Task):
"display_rows",
}
+ ext: set = {".sql"}
+ ext_attr: str = "_sql"
+
def __init__(
self,
name: str,
@@ -71,8 +74,8 @@ class Sql(Task):
*args,
**kwargs
):
+ self._sql = sql
super().__init__(name, TaskType.SQL, *args, **kwargs)
- self.sql = sql
self.param_sql_type = sql_type
self.datasource_name = datasource_name
self.pre_statements = pre_statements or []
@@ -101,7 +104,7 @@ class Sql(Task):
"|(.* |)update |(.* |)truncate |(.* |)alter |(.* |)create ).*"
)
pattern_select = re.compile(pattern_select_str, re.IGNORECASE)
- if pattern_select.match(self.sql) is None:
+ if pattern_select.match(self._sql) is None:
return SqlType.NOT_SELECT
else:
return SqlType.SELECT
diff --git
a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
index 5d1890e83d..95f65b3155 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
@@ -16,12 +16,28 @@
# under the License.
"""Test Task DataX."""
-
+from pathlib import Path
from unittest.mock import patch
import pytest
+from pydolphinscheduler.resources_plugin import Local
from pydolphinscheduler.tasks.datax import CustomDataX, DataX
+from pydolphinscheduler.utils import file
+from tests.testing.file import delete_file
+
+
[email protected]()
+def setup_crt_first(request):
+ """Set up and teardown about create file first and then delete it."""
+ file_content = request.param.get("file_content")
+ file_path = request.param.get("file_path")
+ file.write(
+ content=file_content,
+ to_path=file_path,
+ )
+ yield
+ delete_file(file_path)
@patch(
@@ -122,3 +138,76 @@ def test_custom_datax_get_define(json_template):
):
task = CustomDataX(name, json_template)
assert task.get_define() == expect
+
+
[email protected](
+ "setup_crt_first",
+ [
+ {
+ "file_path": Path(__file__).parent.joinpath("local_res.sql"),
+ "file_content": "test local resource",
+ }
+ ],
+ indirect=True,
+)
[email protected](
+ "attr, expect",
+ [
+ (
+ {
+ "name": "task_datax",
+ "datasource_name": "first_mysql",
+ "datatarget_name": "second_mysql",
+ "sql": "local_res.sql",
+ "target_table": "target_table",
+ "resource_plugin": Local(str(Path(__file__).parent)),
+ },
+ "test local resource",
+ ),
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test_resources_local_datax_command_content(
+ mock_code_version, attr, expect, setup_crt_first
+):
+ """Test task datax sql content through the local resource plug-in."""
+ datax = DataX(**attr)
+ assert expect == getattr(datax, "sql")
+
+
[email protected](
+ "setup_crt_first",
+ [
+ {
+ "file_path": Path(__file__).parent.joinpath("local_res.json"),
+ "file_content": '{content: "test local resource"}',
+ }
+ ],
+ indirect=True,
+)
[email protected](
+ "attr, expect",
+ [
+ (
+ {
+ "name": "task_custom_datax",
+ "json": "local_res.json",
+ "resource_plugin": Local(str(Path(__file__).parent)),
+ },
+ '{content: "test local resource"}',
+ ),
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test_resources_local_custom_datax_command_content(
+ mock_code_version, attr, expect, setup_crt_first
+):
+ """Test task CustomDataX json content through the local resource
plug-in."""
+ custom_datax = CustomDataX(**attr)
+ assert expect == getattr(custom_datax, "json")
diff --git
a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
index e8f7f10d77..77aa10625b 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
@@ -16,26 +16,42 @@
# under the License.
"""Test Task python."""
-
-
+from pathlib import Path
from unittest.mock import patch
import pytest
from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.resources_plugin import Local
from pydolphinscheduler.tasks.python import Python
+from pydolphinscheduler.utils import file
+from tests.testing.file import delete_file
def foo(): # noqa: D103
print("hello world.")
[email protected]()
+def setup_crt_first(request):
+ """Set up and teardown about create file first and then delete it."""
+ file_content = request.param.get("file_content")
+ file_path = request.param.get("file_path")
+ file.write(
+ content=file_content,
+ to_path=file_path,
+ )
+ yield
+ delete_file(file_path)
+
+
@pytest.mark.parametrize(
"attr, expect",
[
(
{"definition": "print(1)"},
{
+ "definition": "print(1)",
"rawScript": "print(1)",
"localParams": [],
"resourceList": [],
@@ -47,6 +63,7 @@ def foo(): # noqa: D103
(
{"definition": "def foo():\n print('I am foo')"},
{
+ "definition": "def foo():\n print('I am foo')",
"rawScript": "def foo():\n print('I am foo')\nfoo()",
"localParams": [],
"resourceList": [],
@@ -58,6 +75,7 @@ def foo(): # noqa: D103
(
{"definition": foo},
{
+ "definition": foo,
"rawScript": 'def foo(): # noqa: D103\n print("hello
world.")\nfoo()',
"localParams": [],
"resourceList": [],
@@ -122,6 +140,7 @@ def test_python_get_define(name, script_code, raw):
"delayTime": 0,
"taskType": "PYTHON",
"taskParams": {
+ "definition": script_code,
"resourceList": [],
"localParams": [],
"rawScript": raw,
@@ -145,3 +164,38 @@ def test_python_get_define(name, script_code, raw):
):
shell = Python(name, script_code)
assert shell.get_define() == expect
+
+
[email protected](
+ "setup_crt_first",
+ [
+ {
+ "file_path": Path(__file__).parent.joinpath("local_res.py"),
+ "file_content": "test local resource",
+ }
+ ],
+ indirect=True,
+)
[email protected](
+ "attr, expect",
+ [
+ (
+ {
+ "name": "task_python",
+ "definition": "local_res.py",
+ "resource_plugin": Local(str(Path(__file__).parent)),
+ },
+ "test local resource",
+ ),
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test_resources_local_python_command_content(
+ mock_code_version, attr, expect, setup_crt_first
+):
+ """Test task Python definition content through the local resource
plug-in."""
+ python = Python(**attr)
+ assert expect == getattr(python, "definition")
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
index ba9daa9b2d..a22d9206d0 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
@@ -16,13 +16,28 @@
# under the License.
"""Test Task Sql."""
-
-
+from pathlib import Path
from unittest.mock import patch
import pytest
+from pydolphinscheduler.resources_plugin import Local
from pydolphinscheduler.tasks.sql import Sql, SqlType
+from pydolphinscheduler.utils import file
+from tests.testing.file import delete_file
+
+file_name = "local_res.sql"
+file_content = "select 1"
+res_plugin_prefix = Path(__file__).parent
+file_path = res_plugin_prefix.joinpath(file_name)
+
+
[email protected]
+def setup_crt_first():
+ """Set up and teardown about create file first and then delete it."""
+ file.write(content=file_content, to_path=file_path)
+ yield
+ delete_file(file_path)
@pytest.mark.parametrize(
@@ -165,3 +180,29 @@ def test_sql_get_define(mock_datasource):
):
task = Sql(name, datasource_name, command)
assert task.get_define() == expect
+
+
[email protected](
+ "attr, expect",
+ [
+ (
+ {
+ "name": "test-sql-local-res",
+ "sql": file_name,
+ "datasource_name": "test_datasource",
+ "resource_plugin": Local(str(res_plugin_prefix)),
+ },
+ file_content,
+ )
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test_resources_local_sql_command_content(
+ mock_code_version, attr, expect, setup_crt_first
+):
+ """Test sql content through the local resource plug-in."""
+ sql = Sql(**attr)
+ assert expect == getattr(sql, "sql")