This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
The following commit(s) were added to refs/heads/main by this push:
new 7baac8f feat: Introduce metaclass to convert JavaObject and refactor
datasource (#75)
7baac8f is described below
commit 7baac8f9d9dbd4d77b5c51b9977ddd85f313dcf8
Author: Jay Chung <[email protected]>
AuthorDate: Mon Feb 27 14:47:47 2023 +0800
feat: Introduce metaclass to convert JavaObject and refactor datasource
(#75)
* Introduce metaclass to convert JavaObject to python easier
* Add more detail for datasource including host, port, schema etc
* Change else using datasource
---
README.md | 2 +-
src/pydolphinscheduler/constants.py | 7 +
src/pydolphinscheduler/core/__init__.py | 2 -
src/pydolphinscheduler/core/database.py | 62 -------
src/pydolphinscheduler/java_gateway.py | 12 +-
.../{core/__init__.py => models/connection.py} | 26 +--
src/pydolphinscheduler/models/datasource.py | 182 +++++++++++++++++++++
src/pydolphinscheduler/models/meta.py | 122 ++++++++++++++
src/pydolphinscheduler/tasks/datax.py | 44 ++++-
src/pydolphinscheduler/tasks/procedure.py | 29 +++-
src/pydolphinscheduler/tasks/sql.py | 22 ++-
tests/core/test_database.py | 54 ------
tests/core/test_yaml_workflow.py | 5 +-
.../core => tests/models}/__init__.py | 14 +-
tests/models/test_database.py | 73 +++++++++
tests/tasks/test_datax.py | 6 +-
tests/tasks/test_procedure.py | 9 +-
tests/tasks/test_sql.py | 13 +-
18 files changed, 506 insertions(+), 178 deletions(-)
diff --git a/README.md b/README.md
index 8f56339..3bbcef8 100644
--- a/README.md
+++ b/README.md
@@ -24,7 +24,7 @@ under the License.
[](https://raw.githubusercontent.com/apache/dolphinscheduler-sdk-python/main/LICENSE)
[](https://pypi.org/project/apache-dolphinscheduler/)
[](https://pepy.tech/project/apache-dolphinscheduler)
-[](https://codecov.io/github/apache/dolphinscheduler-sdk-python?branch=main)
+
[](https://github.com/psf/black)
[](https://pycqa.github.io/isort)
[](https://github.com/apache/dolphinscheduler-sdk-python/actions/workflows/ci.yaml)
diff --git a/src/pydolphinscheduler/constants.py
b/src/pydolphinscheduler/constants.py
index 1bd3c9f..b28d3b6 100644
--- a/src/pydolphinscheduler/constants.py
+++ b/src/pydolphinscheduler/constants.py
@@ -137,3 +137,10 @@ class Version(str):
DS = "dolphinscheduler"
FILE_NAME = "version_ext"
+
+
+class Keyword(str):
+ """Constants for keyword."""
+
+ DATASOURCE_ID = "id"
+ DATASOURCE_TYPE = "type"
diff --git a/src/pydolphinscheduler/core/__init__.py
b/src/pydolphinscheduler/core/__init__.py
index a23c768..5a83e75 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/src/pydolphinscheduler/core/__init__.py
@@ -17,13 +17,11 @@
"""Init pydolphinscheduler.core package."""
-from pydolphinscheduler.core.database import Database
from pydolphinscheduler.core.engine import Engine
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.core.workflow import Workflow
__all__ = [
- "Database",
"Engine",
"Workflow",
"Task",
diff --git a/src/pydolphinscheduler/core/database.py
b/src/pydolphinscheduler/core/database.py
deleted file mode 100644
index 0f5b4bb..0000000
--- a/src/pydolphinscheduler/core/database.py
+++ /dev/null
@@ -1,62 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""Module database."""
-
-from typing import Dict
-
-from py4j.protocol import Py4JJavaError
-
-from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.java_gateway import gateway
-
-
-class Database(dict):
- """database object, get information about database.
-
- You provider database_name contain connection information, it decisions
which
- database type and database instance would run task.
- """
-
- def __init__(self, database_name: str, type_key, database_key, *args,
**kwargs):
- super().__init__(*args, **kwargs)
- self._database = {}
- self.database_name = database_name
- self[type_key] = self.database_type
- self[database_key] = self.database_id
-
- @property
- def database_type(self) -> str:
- """Get database type from java gateway, a wrapper for
:func:`get_database_info`."""
- return self.get_database_info(self.database_name).get("type")
-
- @property
- def database_id(self) -> str:
- """Get database id from java gateway, a wrapper for
:func:`get_database_info`."""
- return self.get_database_info(self.database_name).get("id")
-
- def get_database_info(self, name) -> Dict:
- """Get database info from java gateway, contains database id, type,
name."""
- if self._database:
- return self._database
- else:
- try:
- self._database = gateway.get_datasource_info(name)
- # Handler database source do not exists error, for now we just
terminate the process.
- except Py4JJavaError as ex:
- raise PyDSParamException(str(ex.java_exception))
- return self._database
diff --git a/src/pydolphinscheduler/java_gateway.py
b/src/pydolphinscheduler/java_gateway.py
index 7ca6a79..91243f6 100644
--- a/src/pydolphinscheduler/java_gateway.py
+++ b/src/pydolphinscheduler/java_gateway.py
@@ -110,9 +110,15 @@ class GatewayEntryPoint:
"""Get the java gateway version, expected to be equal with
pydolphinscheduler."""
return self.gateway.entry_point.getGatewayVersion()
- def get_datasource_info(self, name: str):
- """Get datasource info through java gateway."""
- return self.gateway.entry_point.getDatasourceInfo(name)
+ def get_datasource(self, name: str, type: Optional[str] = None):
+ """Get single datasource by java gateway.
+
+ Will use datasource_name to query database, and then filter by
datasource_type if provided.
+
+ :param name: datasource name of the datasource to be queried
+ :param type: datasource type of the datasource, only used to filter
the result.
+ """
+ return self.gateway.entry_point.getDatasource(name, type)
def get_resources_file_info(self, program_type: str, main_package: str):
"""Get resources file info through java gateway."""
diff --git a/src/pydolphinscheduler/core/__init__.py
b/src/pydolphinscheduler/models/connection.py
similarity index 71%
copy from src/pydolphinscheduler/core/__init__.py
copy to src/pydolphinscheduler/models/connection.py
index a23c768..ca2cc70 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/src/pydolphinscheduler/models/connection.py
@@ -15,16 +15,20 @@
# specific language governing permissions and limitations
# under the License.
-"""Init pydolphinscheduler.core package."""
-from pydolphinscheduler.core.database import Database
-from pydolphinscheduler.core.engine import Engine
-from pydolphinscheduler.core.task import Task
-from pydolphinscheduler.core.workflow import Workflow
+"""DolphinScheduler Connection object.
-__all__ = [
- "Database",
- "Engine",
- "Workflow",
- "Task",
-]
+Including the basic information of database connection.
+"""
+
+from typing import NamedTuple
+
+
+class Connection(NamedTuple):
+ """Basic information of database connection."""
+
+ host: str
+ port: int
+ schema: str
+ username: str
+ password: str
diff --git a/src/pydolphinscheduler/models/datasource.py
b/src/pydolphinscheduler/models/datasource.py
new file mode 100644
index 0000000..5602c30
--- /dev/null
+++ b/src/pydolphinscheduler/models/datasource.py
@@ -0,0 +1,182 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Module database."""
+import json
+import re
+from typing import NamedTuple, Optional
+
+from py4j.java_gateway import JavaObject
+
+from pydolphinscheduler.java_gateway import gateway
+from pydolphinscheduler.models.connection import Connection
+from pydolphinscheduler.models.meta import ModelMeta
+
+
+class TaskUsage(NamedTuple):
+ """Class for task usage just like datasource in web ui."""
+
+ id: int
+ type: str
+
+
+class Datasource(metaclass=ModelMeta):
+ """Model datasource, communicate with DolphinScheduler API server and
convert Java Object into Python.
+
+ We use metaclass :class:`pydolphinscheduler.models.meta.ModelMeta` to
convert Java Object into Python.
+ And code in this class just call Java API method.
+
+ You provider database_name contain connection information, it decisions
which
+ database type and database instance would run task.
+
+ :param id_: datasource id, the primary key of table t_ds_datasource.
+ :param name: datasource name, part of unique key (:param:``type_``,
:param:``name``) for datasource
+ object, we support both query the datasource by name or by (type_ +
name). But name must be required
+ unique when you want to query with the name only.
+ :param note: datasource description. A note for current datasource.
+ :param type_: datasource type, part of unique key (:param:``type_``,
:param:``name``) for datasource.
+ It is a datasource type code instead of datasource type name. Optional
when you query datasource by
+ name only. But it must be required when you create it.
+ :param user_id: user id for who create this datasource.
+ :param connection_params: datasource connection detail, including
protocol, host, port, schema etc.
+ In json format and just like this:
+
+ .. code-block:: json
+
+ {
+ "user": "root",
+ "password": "mysql",
+ "address": "jdbc:mysql://127.0.0.1:3306",
+ "database": "test",
+ "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test",
+ "driverClassName": "com.mysql.cj.jdbc.Driver",
+ "validationQuery": "select 1"
+ }
+
+ """
+
+ _PATTERN = re.compile("jdbc:.*://(?P<host>[\\w\\W]+):(?P<port>\\d+)")
+
+ _DATABASE_TYPE_MAP = dict(
+ mysql=0,
+ postgresql=1,
+ hive=2,
+ spark=3,
+ clickhouse=4,
+ oracle=5,
+ sqlserver=6,
+ db2=7,
+ presto=8,
+ h2=9,
+ redshift=10,
+ dameng=11,
+ starrocks=12,
+ )
+
+ def __init__(
+ self,
+ type_: str,
+ name: str,
+ connection_params: str,
+ user_id: Optional[int] = None,
+ id_: Optional[int] = None,
+ note: Optional[str] = None,
+ ):
+ self.id = id_
+ self.name = name
+ self.note = note
+ # TODO try to handle type_ in metaclass
+ self.type_: JavaObject = type_
+ self.user_id = user_id
+ self.connection_params = connection_params
+
+ @classmethod
+ def get(
+ cls, datasource_name: str, datasource_type: Optional[str] = None
+ ) -> "Datasource":
+ """Get single datasource.
+
+ :param datasource_name: datasource name
+ :param datasource_type: datasource type, if not provided, will get
datasource by name only
+ """
+ datasource = gateway.get_datasource(datasource_name, datasource_type)
+ if datasource is None:
+ raise ValueError(
+ f"Datasource with name: {datasource_name} and type:
{datasource_type} not found."
+ )
+ return datasource
+
+ @classmethod
+ def get_task_usage_4j(
+ cls, datasource_name: str, datasource_type: Optional[str] = None
+ ) -> TaskUsage:
+ """Get the necessary information of datasource for task usage in web
UI."""
+ datasource: "Datasource" = cls.get(datasource_name, datasource_type)
+ return TaskUsage(
+ id=datasource.id,
+ type=datasource.type.upper(),
+ )
+
+ @property
+ def connection(self) -> Connection:
+ """Parse dolphinscheduler connection_params to Connection."""
+ data = json.loads(self.connection_params)
+
+ address_match = self._PATTERN.match(data.get("jdbcUrl",
None)).groupdict()
+
+ return Connection(
+ host=address_match.get("host", None),
+ port=int(address_match.get("port", None)),
+ schema=data.get("database", None),
+ username=data.get("user", None),
+ password=data.get("password", None),
+ )
+
+ @property
+ def type(self) -> str:
+ """Property datasource type."""
+ return self.type_.getDescp()
+
+ @property
+ def type_code(self) -> str:
+ """Property datasource type."""
+ return self.type_.getCode()
+
+ @property
+ def host(self) -> str:
+ """Property datasource host, such as ``127.0.0.1`` or
``localhosts``."""
+ return self.connection.host
+
+ @property
+ def port(self) -> int:
+ """Property datasource host, such as ``3306`` or ``5432``."""
+ return int(self.connection.port)
+
+ @property
+ def username(self) -> str:
+ """Property datasource username, such as ``root`` or ``postgres``."""
+ return self.connection.username
+
+ @property
+ def password(self) -> str:
+ """Property datasource password."""
+ return self.connection.password
+
+ @property
+ def schema(self) -> str:
+ """Property datasource schema."""
+ return self.connection.schema
diff --git a/src/pydolphinscheduler/models/meta.py
b/src/pydolphinscheduler/models/meta.py
new file mode 100644
index 0000000..1d7f2d8
--- /dev/null
+++ b/src/pydolphinscheduler/models/meta.py
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Metadata class for module models.
+
+This module contains the ModelMeta class, which is used to convert
``py4j.java_gateway.JavaObject`` to
+``pydolphinscheduler.models`` object. this is useful when you communicate with
the DolphinScheduler
+server to get some resource from database, but you want to make sure the
return object is a in Python
+object.
+"""
+
+from functools import wraps
+from inspect import signature
+from typing import Dict, Tuple
+
+from py4j.java_gateway import JavaObject
+
+from pydolphinscheduler.utils.string import snake2camel
+
+
+class ModelMeta(type):
+ """Mateclass convert ``py4j.java_gateway.JavaObject`` to python object
more easily."""
+
+ _FUNC_INIT = "__init__"
+ _PARAM_SELF = "self"
+
+ def __new__(mcs, name: str, bases: Tuple, attrs: Dict):
+ """Create a new class."""
+ if mcs._FUNC_INIT not in attrs:
+ raise TypeError(
+ "Class with mateclass %s must have %s method",
+ (mcs.__name__, mcs._FUNC_INIT),
+ )
+
+ sig = signature(attrs.get(mcs._FUNC_INIT))
+ param = [
+ param.name
+ for name, param in sig.parameters.items()
+ if name != mcs._PARAM_SELF
+ ]
+
+ for attr_name, attr_value in attrs.items():
+ if isinstance(attr_value, classmethod) and not
attr_name.startswith("__"):
+ attrs[attr_name] = mcs.j2p(attr_value, name, attrs, param)
+ return super(ModelMeta, mcs).__new__(mcs, name, bases, attrs)
+
+ @classmethod
+ def j2p(mcs, cm: classmethod, name: str, attrs: Dict, params=None):
+ """Convert JavaObject to Python object according attribute in the
``__init__`` method.
+
+ ``py4j.java_gateway.JavaObject`` return the Java object from the
DolphinScheduler server, we can
+ access the Java object attribute by ``getAttrName`` method. This
method try to assign the Java object
+ attribute to the Python object attribute according the attribute in
python ``__init__`` method.
+
+ For example, If the method return value is
``py4j.java_gateway.JavaObject`` we temporary call it
+ ``JObject``, and we create a ``PObject`` object in Python, the
``__init__`` method is:
+
+ .. code-block:: python
+
+ def __init__(
+ self,
+ name: str,
+ description: str
+ ):
+ self.name = name
+ self.description = description
+
+ Because the ``name`` and ``description`` is the attribute in the
``__init__`` method, so this method
+ will try to call method ``getName`` and ``getDescription`` from the
``JObject`` and assign the return
+ value to the ``PObject`` attribute. Just like this:
+
+ .. code-block:: python
+
+ return PObject(name=JObject.getName(),
description=JObject.getDescription())
+ """
+
+ @wraps(cm)
+ def wrapper(*args, **kwargs):
+ class_ = type(name, (), attrs)
+
+ method_result = cm.__func__(class_, *args, **kwargs)
+
+ # skip convert if method result is not JavaObject, they maybe some
property method
+ if not isinstance(method_result, JavaObject):
+ return method_result
+
+ obj_init_params = []
+ for param in params:
+ java_func_name = mcs.py4j_attr_func_name(param)
+ java_func = getattr(method_result, java_func_name)
+ obj_init_params.append(java_func())
+
+ return class_(*obj_init_params)
+
+ return wrapper
+
+ @classmethod
+ def py4j_attr_func_name(mcs, name: str) -> str:
+ """Convert python attribute name to py4j java attribute name.
+
+ Python attribute name is snake case, but py4j java attribute name is
camel case. This method
+ will convert snake case to camel case with adding the ``get`` prefix.
for example:
+
+ - attr -> getAttr
+ - attr_name -> getAttrName
+ - attr_ -> getAttr
+ """
+ return snake2camel(f"get_{name}")
diff --git a/src/pydolphinscheduler/tasks/datax.py
b/src/pydolphinscheduler/tasks/datax.py
index 945f782..1dfa89c 100644
--- a/src/pydolphinscheduler/tasks/datax.py
+++ b/src/pydolphinscheduler/tasks/datax.py
@@ -20,8 +20,8 @@
from typing import Dict, List, Optional
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.database import Database
from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.models.datasource import Datasource
class CustomDataX(Task):
@@ -57,12 +57,21 @@ class DataX(Task):
"""Task DataX object, declare behavior for DataX task to dolphinscheduler.
It should run database datax job in multiply sql link engine, such as:
+
- MySQL
- Oracle
- Postgresql
- SQLServer
+
You provider datasource_name and datatarget_name contain connection
information, it decisions which
database type and database instance would synchronous data.
+
+ :param name: task name.
+ :param datasource_name: source database name for task datax to extract
data.
+ :param datatarget_name: target database name for task datax to load data.
+ :param sql: sql statement for task datax to extract data form source
database.
+ :param target_table: target table name for task datax to load data into
target database.
+ :param datasource_type: source database type, dolphinscheduler use
"""
CUSTOM_CONFIG = 0
@@ -89,6 +98,8 @@ class DataX(Task):
datatarget_name: str,
sql: str,
target_table: str,
+ datasource_type: Optional[str] = None,
+ datatarget_type: Optional[str] = None,
job_speed_byte: Optional[int] = 0,
job_speed_record: Optional[int] = 1000,
pre_statements: Optional[List[str]] = None,
@@ -101,7 +112,9 @@ class DataX(Task):
self._sql = sql
super().__init__(name, TaskType.DATAX, *args, **kwargs)
self.custom_config = self.CUSTOM_CONFIG
+ self.datasource_type = datasource_type
self.datasource_name = datasource_name
+ self.datatarget_type = datatarget_type
self.datatarget_name = datatarget_name
self.target_table = target_table
self.job_speed_byte = job_speed_byte
@@ -111,6 +124,28 @@ class DataX(Task):
self.xms = xms
self.xmx = xmx
+ @property
+ def source_params(self) -> Dict:
+ """Get source params for datax task."""
+ datasource_task_u = Datasource.get_task_usage_4j(
+ self.datasource_name, self.datasource_type
+ )
+ return {
+ "dsType": datasource_task_u.type,
+ "dataSource": datasource_task_u.id,
+ }
+
+ @property
+ def target_params(self) -> Dict:
+ """Get target params for datax task."""
+ datasource_task_u = Datasource.get_task_usage_4j(
+ self.datatarget_name, self.datatarget_type
+ )
+ return {
+ "dtType": datasource_task_u.type,
+ "dataTarget": datasource_task_u.id,
+ }
+
@property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
Dict:
"""Override Task.task_params for datax task.
@@ -119,9 +154,6 @@ class DataX(Task):
directly set as python property, so we Override Task.task_params here.
"""
params = super().task_params
- datasource = Database(self.datasource_name, "dsType", "dataSource")
- params.update(datasource)
-
- datatarget = Database(self.datatarget_name, "dtType", "dataTarget")
- params.update(datatarget)
+ params.update(self.source_params)
+ params.update(self.target_params)
return params
diff --git a/src/pydolphinscheduler/tasks/procedure.py
b/src/pydolphinscheduler/tasks/procedure.py
index 6383e07..5a9d87b 100644
--- a/src/pydolphinscheduler/tasks/procedure.py
+++ b/src/pydolphinscheduler/tasks/procedure.py
@@ -17,11 +17,11 @@
"""Task procedure."""
-from typing import Dict
+from typing import Dict, Optional
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.database import Database
from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.models.datasource import Datasource
class Procedure(Task):
@@ -42,11 +42,31 @@ class Procedure(Task):
_task_custom_attr = {"method"}
- def __init__(self, name: str, datasource_name: str, method: str, *args,
**kwargs):
+ def __init__(
+ self,
+ name: str,
+ datasource_name: str,
+ method: str,
+ datasource_type: Optional[str] = None,
+ *args,
+ **kwargs
+ ):
super().__init__(name, TaskType.PROCEDURE, *args, **kwargs)
self.datasource_name = datasource_name
+ self.datasource_type = datasource_type
self.method = method
+ @property
+ def datasource(self) -> Dict:
+ """Get datasource for procedure task."""
+ datasource_task_u = Datasource.get_task_usage_4j(
+ self.datasource_name, self.datasource_type
+ )
+ return {
+ "datasource": datasource_task_u.id,
+ "type": datasource_task_u.type,
+ }
+
@property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
Dict:
"""Override Task.task_params for produce task.
@@ -55,6 +75,5 @@ class Procedure(Task):
directly set as python property, so we Override Task.task_params here.
"""
params = super().task_params
- datasource = Database(self.datasource_name, "type", "datasource")
- params.update(datasource)
+ params.update(self.datasource)
return params
diff --git a/src/pydolphinscheduler/tasks/sql.py
b/src/pydolphinscheduler/tasks/sql.py
index 5a6d890..a320d43 100644
--- a/src/pydolphinscheduler/tasks/sql.py
+++ b/src/pydolphinscheduler/tasks/sql.py
@@ -22,8 +22,8 @@ import re
from typing import Dict, List, Optional, Sequence, Union
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.database import Database
from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.models.datasource import Datasource
log = logging.getLogger(__file__)
@@ -79,9 +79,10 @@ class Sql(Task):
name: str,
datasource_name: str,
sql: str,
+ datasource_type: Optional[str] = None,
sql_type: Optional[str] = None,
- pre_statements: Optional[Union[str, Sequence[str]]] = None,
- post_statements: Optional[Union[str, Sequence[str]]] = None,
+ pre_statements: Union[str, Sequence[str], None] = None,
+ post_statements: Union[str, Sequence[str], None] = None,
display_rows: Optional[int] = 10,
*args,
**kwargs
@@ -90,6 +91,7 @@ class Sql(Task):
super().__init__(name, TaskType.SQL, *args, **kwargs)
self.param_sql_type = sql_type
self.datasource_name = datasource_name
+ self.datasource_type = datasource_type
self.pre_statements = self.get_stm_list(pre_statements)
self.post_statements = self.get_stm_list(post_statements)
self.display_rows = display_rows
@@ -134,6 +136,17 @@ class Sql(Task):
else:
return SqlType.SELECT
+ @property
+ def datasource(self) -> Dict:
+ """Get datasource for procedure sql."""
+ datasource_task_u = Datasource.get_task_usage_4j(
+ self.datasource_name, self.datasource_type
+ )
+ return {
+ "datasource": datasource_task_u.id,
+ "type": datasource_task_u.type,
+ }
+
@property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
Dict:
"""Override Task.task_params for sql task.
@@ -142,6 +155,5 @@ class Sql(Task):
directly set as python property, so we Override Task.task_params here.
"""
params = super().task_params
- datasource = Database(self.datasource_name, "type", "datasource")
- params.update(datasource)
+ params.update(self.datasource)
return params
diff --git a/tests/core/test_database.py b/tests/core/test_database.py
deleted file mode 100644
index 1286a4a..0000000
--- a/tests/core/test_database.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""Test Database."""
-
-
-from unittest.mock import patch
-
-import pytest
-
-from pydolphinscheduler.core.database import Database
-
-TEST_DATABASE_DATASOURCE_NAME = "test_datasource"
-TEST_DATABASE_TYPE_KEY = "type"
-TEST_DATABASE_KEY = "datasource"
-
-
[email protected](
- "expect",
- [
- {
- TEST_DATABASE_TYPE_KEY: "mock_type",
- TEST_DATABASE_KEY: 1,
- }
- ],
-)
-@patch(
- "pydolphinscheduler.core.task.Task.gen_code_and_version",
- return_value=(123, 1),
-)
-@patch(
- "pydolphinscheduler.core.database.Database.get_database_info",
- return_value=({"id": 1, "type": "mock_type"}),
-)
-def test_get_datasource_detail(mock_datasource, mock_code_version, expect):
- """Test :func:`get_database_type` and :func:`get_database_id` can return
expect value."""
- database_info = Database(
- TEST_DATABASE_DATASOURCE_NAME, TEST_DATABASE_TYPE_KEY,
TEST_DATABASE_KEY
- )
- assert expect == database_info
diff --git a/tests/core/test_yaml_workflow.py b/tests/core/test_yaml_workflow.py
index 244965f..80e32c1 100644
--- a/tests/core/test_yaml_workflow.py
+++ b/tests/core/test_yaml_workflow.py
@@ -32,6 +32,7 @@ from pydolphinscheduler.core.yaml_workflow import (
get_task_cls,
)
from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
+from pydolphinscheduler.models.datasource import TaskUsage
from tests.testing.path import path_yaml_example
from tests.testing.task import Task
@@ -182,8 +183,8 @@ def test_get_error(task_type):
return_value=({"id": 1, "name": "test"}),
)
@patch(
- "pydolphinscheduler.core.database.Database.get_database_info",
- return_value=({"id": 1, "type": "mock_type"}),
+ "pydolphinscheduler.models.datasource.Datasource.get_task_usage_4j",
+ return_value=TaskUsage(id=1, type="MYSQL"),
)
@patch(
"pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
diff --git a/src/pydolphinscheduler/core/__init__.py b/tests/models/__init__.py
similarity index 70%
copy from src/pydolphinscheduler/core/__init__.py
copy to tests/models/__init__.py
index a23c768..9317c75 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/tests/models/__init__.py
@@ -15,16 +15,4 @@
# specific language governing permissions and limitations
# under the License.
-"""Init pydolphinscheduler.core package."""
-
-from pydolphinscheduler.core.database import Database
-from pydolphinscheduler.core.engine import Engine
-from pydolphinscheduler.core.task import Task
-from pydolphinscheduler.core.workflow import Workflow
-
-__all__ = [
- "Database",
- "Engine",
- "Workflow",
- "Task",
-]
+"""Init modules package tests."""
diff --git a/tests/models/test_database.py b/tests/models/test_database.py
new file mode 100644
index 0000000..44e7561
--- /dev/null
+++ b/tests/models/test_database.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Database."""
+import json
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.models.connection import Connection
+from pydolphinscheduler.models.datasource import Datasource
+
+TEST_DATABASE_DATASOURCE_NAME = "test_datasource"
+TEST_DATABASE_TYPE = "mysql"
+
+TEST_CONNECTION_PARAMS = {
+ "user": "root",
+ "password": "mysql",
+ "address": "jdbc:mysql://127.0.0.1:3306",
+ "database": "test",
+ "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test",
+ "driverClassName": "com.mysql.cj.jdbc.Driver",
+ "validationQuery": "select 1",
+}
+
+TEST_CONNECTION_ARG = {
+ "host": "127.0.0.1",
+ "port": 3306,
+ "schema": "test",
+ "username": "root",
+ "password": "mysql",
+}
+
+
+datasource = Datasource(
+ id_=1,
+ type_=TEST_DATABASE_TYPE,
+ name=TEST_DATABASE_DATASOURCE_NAME,
+ connection_params=json.dumps(TEST_CONNECTION_PARAMS),
+ user_id=1,
+)
+
+
[email protected](
+ "attr, value",
+ [
+ ("connection", Connection(**TEST_CONNECTION_ARG)),
+ ("host", "127.0.0.1"),
+ ("port", 3306),
+ ("username", "root"),
+ ("password", "mysql"),
+ ("schema", "test"),
+ ],
+)
[email protected](Datasource, "get", return_value=datasource)
+def test_get_datasource_attr(mock_datasource, attr, value):
+ """Test get datasource attr."""
+ datasource_get = Datasource.get(TEST_DATABASE_DATASOURCE_NAME,
TEST_DATABASE_TYPE)
+ assert value == getattr(datasource_get, attr)
diff --git a/tests/tasks/test_datax.py b/tests/tasks/test_datax.py
index d1143bb..b174afa 100644
--- a/tests/tasks/test_datax.py
+++ b/tests/tasks/test_datax.py
@@ -21,6 +21,7 @@ from unittest.mock import patch
import pytest
+from pydolphinscheduler.models.datasource import Datasource, TaskUsage
from pydolphinscheduler.resources_plugin import Local
from pydolphinscheduler.tasks.datax import CustomDataX, DataX
from pydolphinscheduler.utils import file
@@ -40,10 +41,7 @@ def setup_crt_first(request):
delete_file(file_path)
-@patch(
- "pydolphinscheduler.core.database.Database.get_database_info",
- return_value=({"id": 1, "type": "MYSQL"}),
-)
[email protected](Datasource, "get_task_usage_4j", return_value=TaskUsage(1,
"MYSQL"))
def test_datax_get_define(mock_datasource):
"""Test task datax function get_define."""
code = 123
diff --git a/tests/tasks/test_procedure.py b/tests/tasks/test_procedure.py
index 02bfaf5..1d0df55 100644
--- a/tests/tasks/test_procedure.py
+++ b/tests/tasks/test_procedure.py
@@ -21,6 +21,7 @@ from unittest.mock import patch
import pytest
+from pydolphinscheduler.models.datasource import TaskUsage
from pydolphinscheduler.tasks.procedure import Procedure
TEST_PROCEDURE_SQL = (
@@ -56,8 +57,8 @@ TEST_PROCEDURE_DATASOURCE_NAME = "test_datasource"
return_value=(123, 1),
)
@patch(
- "pydolphinscheduler.core.database.Database.get_database_info",
- return_value=({"id": 1, "type": "MYSQL"}),
+ "pydolphinscheduler.models.datasource.Datasource.get_task_usage_4j",
+ return_value=TaskUsage(id=1, type="MYSQL"),
)
def test_property_task_params(mock_datasource, mock_code_version, attr,
expect):
"""Test task sql task property."""
@@ -70,8 +71,8 @@ def test_property_task_params(mock_datasource,
mock_code_version, attr, expect):
return_value=(123, 1),
)
@patch(
- "pydolphinscheduler.core.database.Database.get_database_info",
- return_value=({"id": 1, "type": "MYSQL"}),
+ "pydolphinscheduler.models.datasource.Datasource.get_task_usage_4j",
+ return_value=TaskUsage(id=1, type="MYSQL"),
)
def test_sql_get_define(mock_datasource, mock_code_version):
"""Test task procedure function get_define."""
diff --git a/tests/tasks/test_sql.py b/tests/tasks/test_sql.py
index 42a168d..f150820 100644
--- a/tests/tasks/test_sql.py
+++ b/tests/tasks/test_sql.py
@@ -21,6 +21,7 @@ from unittest.mock import patch
import pytest
+from pydolphinscheduler.models.datasource import TaskUsage
from pydolphinscheduler.resources_plugin import Local
from pydolphinscheduler.tasks.sql import Sql, SqlType
from pydolphinscheduler.utils import file
@@ -105,8 +106,8 @@ def test_get_stm_list(stm, expected) -> None:
return_value=(123, 1),
)
@patch(
- "pydolphinscheduler.core.database.Database.get_database_info",
- return_value=({"id": 1, "type": "mock_type"}),
+ "pydolphinscheduler.models.datasource.Datasource.get_task_usage_4j",
+ return_value=TaskUsage(id=1, type="mock_type"),
)
def test_get_sql_type(
mock_datasource, mock_code_version, sql, param_sql_type, sql_type
@@ -147,8 +148,8 @@ def test_get_sql_type(
return_value=(123, 1),
)
@patch(
- "pydolphinscheduler.core.database.Database.get_database_info",
- return_value=({"id": 1, "type": "MYSQL"}),
+ "pydolphinscheduler.models.datasource.Datasource.get_task_usage_4j",
+ return_value=TaskUsage(id=1, type="MYSQL"),
)
def test_property_task_params(mock_datasource, mock_code_version, attr,
expect):
"""Test task sql task property."""
@@ -157,8 +158,8 @@ def test_property_task_params(mock_datasource,
mock_code_version, attr, expect):
@patch(
- "pydolphinscheduler.core.database.Database.get_database_info",
- return_value=({"id": 1, "type": "MYSQL"}),
+ "pydolphinscheduler.models.datasource.Datasource.get_task_usage_4j",
+ return_value=TaskUsage(id=1, type="MYSQL"),
)
def test_sql_get_define(mock_datasource):
"""Test task sql function get_define."""