This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 7baac8f  feat: Introduce metaclass to convert JavaObject and refactor 
datasource (#75)
7baac8f is described below

commit 7baac8f9d9dbd4d77b5c51b9977ddd85f313dcf8
Author: Jay Chung <[email protected]>
AuthorDate: Mon Feb 27 14:47:47 2023 +0800

    feat: Introduce metaclass to convert JavaObject and refactor datasource 
(#75)
    
    * Introduce metaclass to convert JavaObject to python easier
    * Add more detail for datasource including host, port, schema etc
    * Change else using datasource
---
 README.md                                          |   2 +-
 src/pydolphinscheduler/constants.py                |   7 +
 src/pydolphinscheduler/core/__init__.py            |   2 -
 src/pydolphinscheduler/core/database.py            |  62 -------
 src/pydolphinscheduler/java_gateway.py             |  12 +-
 .../{core/__init__.py => models/connection.py}     |  26 +--
 src/pydolphinscheduler/models/datasource.py        | 182 +++++++++++++++++++++
 src/pydolphinscheduler/models/meta.py              | 122 ++++++++++++++
 src/pydolphinscheduler/tasks/datax.py              |  44 ++++-
 src/pydolphinscheduler/tasks/procedure.py          |  29 +++-
 src/pydolphinscheduler/tasks/sql.py                |  22 ++-
 tests/core/test_database.py                        |  54 ------
 tests/core/test_yaml_workflow.py                   |   5 +-
 .../core => tests/models}/__init__.py              |  14 +-
 tests/models/test_database.py                      |  73 +++++++++
 tests/tasks/test_datax.py                          |   6 +-
 tests/tasks/test_procedure.py                      |   9 +-
 tests/tasks/test_sql.py                            |  13 +-
 18 files changed, 506 insertions(+), 178 deletions(-)

diff --git a/README.md b/README.md
index 8f56339..3bbcef8 100644
--- a/README.md
+++ b/README.md
@@ -24,7 +24,7 @@ under the License.
 [![PyPi 
License](https://img.shields.io/:license-Apache%202-blue.svg?style=flat-square)](https://raw.githubusercontent.com/apache/dolphinscheduler-sdk-python/main/LICENSE)
 [![PyPi 
Status](https://img.shields.io/pypi/status/apache-dolphinscheduler.svg?style=flat-square)](https://pypi.org/project/apache-dolphinscheduler/)
 
[![Downloads](https://pepy.tech/badge/apache-dolphinscheduler/month)](https://pepy.tech/project/apache-dolphinscheduler)
-[![Coverage 
Status](https://img.shields.io/codecov/c/github/apache/dolphinscheduler-sdk-python/main.svg?style=flat-square)](https://codecov.io/github/apache/dolphinscheduler-sdk-python?branch=main)
+![Coverage 
Status](https://img.shields.io/codecov/c/github/apache/dolphinscheduler-sdk-python/main.svg?style=flat-square)
 [![Code style: 
black](https://img.shields.io/badge/code%20style-black-000000.svg?style=flat-square)](https://github.com/psf/black)
 [![Imports: 
isort](https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat-square&labelColor=ef8336)](https://pycqa.github.io/isort)
 
[![CI](https://github.com/apache/dolphinscheduler-sdk-python/actions/workflows/ci.yaml/badge.svg)](https://github.com/apache/dolphinscheduler-sdk-python/actions/workflows/ci.yaml)
diff --git a/src/pydolphinscheduler/constants.py 
b/src/pydolphinscheduler/constants.py
index 1bd3c9f..b28d3b6 100644
--- a/src/pydolphinscheduler/constants.py
+++ b/src/pydolphinscheduler/constants.py
@@ -137,3 +137,10 @@ class Version(str):
 
     DS = "dolphinscheduler"
     FILE_NAME = "version_ext"
+
+
+class Keyword(str):
+    """Constants for keyword."""
+
+    DATASOURCE_ID = "id"
+    DATASOURCE_TYPE = "type"
diff --git a/src/pydolphinscheduler/core/__init__.py 
b/src/pydolphinscheduler/core/__init__.py
index a23c768..5a83e75 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/src/pydolphinscheduler/core/__init__.py
@@ -17,13 +17,11 @@
 
 """Init pydolphinscheduler.core package."""
 
-from pydolphinscheduler.core.database import Database
 from pydolphinscheduler.core.engine import Engine
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.core.workflow import Workflow
 
 __all__ = [
-    "Database",
     "Engine",
     "Workflow",
     "Task",
diff --git a/src/pydolphinscheduler/core/database.py 
b/src/pydolphinscheduler/core/database.py
deleted file mode 100644
index 0f5b4bb..0000000
--- a/src/pydolphinscheduler/core/database.py
+++ /dev/null
@@ -1,62 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""Module database."""
-
-from typing import Dict
-
-from py4j.protocol import Py4JJavaError
-
-from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.java_gateway import gateway
-
-
-class Database(dict):
-    """database object, get information about database.
-
-    You provider database_name contain connection information, it decisions 
which
-    database type and database instance would run task.
-    """
-
-    def __init__(self, database_name: str, type_key, database_key, *args, 
**kwargs):
-        super().__init__(*args, **kwargs)
-        self._database = {}
-        self.database_name = database_name
-        self[type_key] = self.database_type
-        self[database_key] = self.database_id
-
-    @property
-    def database_type(self) -> str:
-        """Get database type from java gateway, a wrapper for 
:func:`get_database_info`."""
-        return self.get_database_info(self.database_name).get("type")
-
-    @property
-    def database_id(self) -> str:
-        """Get database id from java gateway, a wrapper for 
:func:`get_database_info`."""
-        return self.get_database_info(self.database_name).get("id")
-
-    def get_database_info(self, name) -> Dict:
-        """Get database info from java gateway, contains database id, type, 
name."""
-        if self._database:
-            return self._database
-        else:
-            try:
-                self._database = gateway.get_datasource_info(name)
-            # Handler database source do not exists error, for now we just 
terminate the process.
-            except Py4JJavaError as ex:
-                raise PyDSParamException(str(ex.java_exception))
-            return self._database
diff --git a/src/pydolphinscheduler/java_gateway.py 
b/src/pydolphinscheduler/java_gateway.py
index 7ca6a79..91243f6 100644
--- a/src/pydolphinscheduler/java_gateway.py
+++ b/src/pydolphinscheduler/java_gateway.py
@@ -110,9 +110,15 @@ class GatewayEntryPoint:
         """Get the java gateway version, expected to be equal with 
pydolphinscheduler."""
         return self.gateway.entry_point.getGatewayVersion()
 
-    def get_datasource_info(self, name: str):
-        """Get datasource info through java gateway."""
-        return self.gateway.entry_point.getDatasourceInfo(name)
+    def get_datasource(self, name: str, type: Optional[str] = None):
+        """Get single datasource by java gateway.
+
+        Will use datasource_name to query database, and then filter by 
datasource_type if provided.
+
+        :param name: datasource name of the datasource to be queried
+        :param type: datasource type of the datasource, only used to filter 
the result.
+        """
+        return self.gateway.entry_point.getDatasource(name, type)
 
     def get_resources_file_info(self, program_type: str, main_package: str):
         """Get resources file info through java gateway."""
diff --git a/src/pydolphinscheduler/core/__init__.py 
b/src/pydolphinscheduler/models/connection.py
similarity index 71%
copy from src/pydolphinscheduler/core/__init__.py
copy to src/pydolphinscheduler/models/connection.py
index a23c768..ca2cc70 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/src/pydolphinscheduler/models/connection.py
@@ -15,16 +15,20 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Init pydolphinscheduler.core package."""
 
-from pydolphinscheduler.core.database import Database
-from pydolphinscheduler.core.engine import Engine
-from pydolphinscheduler.core.task import Task
-from pydolphinscheduler.core.workflow import Workflow
+"""DolphinScheduler Connection object.
 
-__all__ = [
-    "Database",
-    "Engine",
-    "Workflow",
-    "Task",
-]
+Including the basic information of database connection.
+"""
+
+from typing import NamedTuple
+
+
+class Connection(NamedTuple):
+    """Basic information of database connection."""
+
+    host: str
+    port: int
+    schema: str
+    username: str
+    password: str
diff --git a/src/pydolphinscheduler/models/datasource.py 
b/src/pydolphinscheduler/models/datasource.py
new file mode 100644
index 0000000..5602c30
--- /dev/null
+++ b/src/pydolphinscheduler/models/datasource.py
@@ -0,0 +1,182 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Module database."""
+import json
+import re
+from typing import NamedTuple, Optional
+
+from py4j.java_gateway import JavaObject
+
+from pydolphinscheduler.java_gateway import gateway
+from pydolphinscheduler.models.connection import Connection
+from pydolphinscheduler.models.meta import ModelMeta
+
+
+class TaskUsage(NamedTuple):
+    """Class for task usage just like datasource in web ui."""
+
+    id: int
+    type: str
+
+
+class Datasource(metaclass=ModelMeta):
+    """Model datasource, communicate with DolphinScheduler API server and 
convert Java Object into Python.
+
+    We use metaclass :class:`pydolphinscheduler.models.meta.ModelMeta` to 
convert Java Object into Python.
+    And code in this class just call Java API method.
+
+    You provider database_name contain connection information, it decisions 
which
+    database type and database instance would run task.
+
+    :param id_: datasource id, the primary key of table t_ds_datasource.
+    :param name: datasource name, part of unique key (:param:``type_``, 
:param:``name``) for datasource
+        object, we support both query the datasource by name or by (type_ + 
name). But name must be required
+        unique when you want to query with the name only.
+    :param note: datasource description. A note for current datasource.
+    :param type_: datasource type, part of unique key (:param:``type_``, 
:param:``name``) for datasource.
+        It is a datasource type code instead of datasource type name. Optional 
when you query datasource by
+        name only. But it must be required when you create it.
+    :param user_id: user id for who create this datasource.
+    :param connection_params: datasource connection detail, including 
protocol, host, port, schema etc.
+        In json format and just like this:
+
+        .. code-block:: json
+
+            {
+                "user": "root",
+                "password": "mysql",
+                "address": "jdbc:mysql://127.0.0.1:3306",
+                "database": "test",
+                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test",
+                "driverClassName": "com.mysql.cj.jdbc.Driver",
+                "validationQuery": "select 1"
+            }
+
+    """
+
+    _PATTERN = re.compile("jdbc:.*://(?P<host>[\\w\\W]+):(?P<port>\\d+)")
+
+    _DATABASE_TYPE_MAP = dict(
+        mysql=0,
+        postgresql=1,
+        hive=2,
+        spark=3,
+        clickhouse=4,
+        oracle=5,
+        sqlserver=6,
+        db2=7,
+        presto=8,
+        h2=9,
+        redshift=10,
+        dameng=11,
+        starrocks=12,
+    )
+
+    def __init__(
+        self,
+        type_: str,
+        name: str,
+        connection_params: str,
+        user_id: Optional[int] = None,
+        id_: Optional[int] = None,
+        note: Optional[str] = None,
+    ):
+        self.id = id_
+        self.name = name
+        self.note = note
+        # TODO try to handle type_ in metaclass
+        self.type_: JavaObject = type_
+        self.user_id = user_id
+        self.connection_params = connection_params
+
+    @classmethod
+    def get(
+        cls, datasource_name: str, datasource_type: Optional[str] = None
+    ) -> "Datasource":
+        """Get single datasource.
+
+        :param datasource_name: datasource name
+        :param datasource_type: datasource type, if not provided, will get 
datasource by name only
+        """
+        datasource = gateway.get_datasource(datasource_name, datasource_type)
+        if datasource is None:
+            raise ValueError(
+                f"Datasource with name: {datasource_name} and type: 
{datasource_type} not found."
+            )
+        return datasource
+
+    @classmethod
+    def get_task_usage_4j(
+        cls, datasource_name: str, datasource_type: Optional[str] = None
+    ) -> TaskUsage:
+        """Get the necessary information of datasource for task usage in web 
UI."""
+        datasource: "Datasource" = cls.get(datasource_name, datasource_type)
+        return TaskUsage(
+            id=datasource.id,
+            type=datasource.type.upper(),
+        )
+
+    @property
+    def connection(self) -> Connection:
+        """Parse dolphinscheduler connection_params to Connection."""
+        data = json.loads(self.connection_params)
+
+        address_match = self._PATTERN.match(data.get("jdbcUrl", 
None)).groupdict()
+
+        return Connection(
+            host=address_match.get("host", None),
+            port=int(address_match.get("port", None)),
+            schema=data.get("database", None),
+            username=data.get("user", None),
+            password=data.get("password", None),
+        )
+
+    @property
+    def type(self) -> str:
+        """Property datasource type."""
+        return self.type_.getDescp()
+
+    @property
+    def type_code(self) -> str:
+        """Property datasource type."""
+        return self.type_.getCode()
+
+    @property
+    def host(self) -> str:
+        """Property datasource host, such as ``127.0.0.1`` or 
``localhosts``."""
+        return self.connection.host
+
+    @property
+    def port(self) -> int:
+        """Property datasource host, such as ``3306`` or ``5432``."""
+        return int(self.connection.port)
+
+    @property
+    def username(self) -> str:
+        """Property datasource username, such as ``root`` or ``postgres``."""
+        return self.connection.username
+
+    @property
+    def password(self) -> str:
+        """Property datasource password."""
+        return self.connection.password
+
+    @property
+    def schema(self) -> str:
+        """Property datasource schema."""
+        return self.connection.schema
diff --git a/src/pydolphinscheduler/models/meta.py 
b/src/pydolphinscheduler/models/meta.py
new file mode 100644
index 0000000..1d7f2d8
--- /dev/null
+++ b/src/pydolphinscheduler/models/meta.py
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Metadata class for module models.
+
+This module contains the ModelMeta class, which is used to convert 
``py4j.java_gateway.JavaObject`` to
+``pydolphinscheduler.models`` object. this is useful when you communicate with 
the DolphinScheduler
+server to get some resource from database, but you want to make sure the 
return object is a in Python
+object.
+"""
+
+from functools import wraps
+from inspect import signature
+from typing import Dict, Tuple
+
+from py4j.java_gateway import JavaObject
+
+from pydolphinscheduler.utils.string import snake2camel
+
+
+class ModelMeta(type):
+    """Mateclass convert ``py4j.java_gateway.JavaObject`` to python object 
more easily."""
+
+    _FUNC_INIT = "__init__"
+    _PARAM_SELF = "self"
+
+    def __new__(mcs, name: str, bases: Tuple, attrs: Dict):
+        """Create a new class."""
+        if mcs._FUNC_INIT not in attrs:
+            raise TypeError(
+                "Class with mateclass %s must have %s method",
+                (mcs.__name__, mcs._FUNC_INIT),
+            )
+
+        sig = signature(attrs.get(mcs._FUNC_INIT))
+        param = [
+            param.name
+            for name, param in sig.parameters.items()
+            if name != mcs._PARAM_SELF
+        ]
+
+        for attr_name, attr_value in attrs.items():
+            if isinstance(attr_value, classmethod) and not 
attr_name.startswith("__"):
+                attrs[attr_name] = mcs.j2p(attr_value, name, attrs, param)
+        return super(ModelMeta, mcs).__new__(mcs, name, bases, attrs)
+
+    @classmethod
+    def j2p(mcs, cm: classmethod, name: str, attrs: Dict, params=None):
+        """Convert JavaObject to Python object according attribute in the 
``__init__`` method.
+
+        ``py4j.java_gateway.JavaObject`` return the Java object from the 
DolphinScheduler server, we can
+        access the Java object attribute by ``getAttrName`` method. This 
method try to assign the Java object
+        attribute to the Python object attribute according the attribute in 
python ``__init__`` method.
+
+        For example, If the method return value is 
``py4j.java_gateway.JavaObject`` we temporary call it
+        ``JObject``, and we create a ``PObject`` object in Python, the 
``__init__`` method is:
+
+        .. code-block:: python
+
+            def __init__(
+                self,
+                name: str,
+                description: str
+            ):
+                self.name = name
+                self.description = description
+
+        Because the ``name`` and ``description`` is the attribute in the 
``__init__`` method, so this method
+        will try to call method ``getName`` and ``getDescription`` from the 
``JObject`` and assign the return
+        value to the ``PObject`` attribute. Just like this:
+
+        .. code-block:: python
+
+            return PObject(name=JObject.getName(), 
description=JObject.getDescription())
+        """
+
+        @wraps(cm)
+        def wrapper(*args, **kwargs):
+            class_ = type(name, (), attrs)
+
+            method_result = cm.__func__(class_, *args, **kwargs)
+
+            # skip convert if method result is not JavaObject, they maybe some 
property method
+            if not isinstance(method_result, JavaObject):
+                return method_result
+
+            obj_init_params = []
+            for param in params:
+                java_func_name = mcs.py4j_attr_func_name(param)
+                java_func = getattr(method_result, java_func_name)
+                obj_init_params.append(java_func())
+
+            return class_(*obj_init_params)
+
+        return wrapper
+
+    @classmethod
+    def py4j_attr_func_name(mcs, name: str) -> str:
+        """Convert python attribute name to py4j java attribute name.
+
+        Python attribute name is snake case, but py4j java attribute name is 
camel case. This method
+        will convert snake case to camel case with adding the ``get`` prefix. 
for example:
+
+        - attr -> getAttr
+        - attr_name -> getAttrName
+        - attr_ -> getAttr
+        """
+        return snake2camel(f"get_{name}")
diff --git a/src/pydolphinscheduler/tasks/datax.py 
b/src/pydolphinscheduler/tasks/datax.py
index 945f782..1dfa89c 100644
--- a/src/pydolphinscheduler/tasks/datax.py
+++ b/src/pydolphinscheduler/tasks/datax.py
@@ -20,8 +20,8 @@
 from typing import Dict, List, Optional
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.database import Database
 from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.models.datasource import Datasource
 
 
 class CustomDataX(Task):
@@ -57,12 +57,21 @@ class DataX(Task):
     """Task DataX object, declare behavior for DataX task to dolphinscheduler.
 
     It should run database datax job in multiply sql link engine, such as:
+
     - MySQL
     - Oracle
     - Postgresql
     - SQLServer
+
     You provider datasource_name and datatarget_name contain connection 
information, it decisions which
     database type and database instance would synchronous data.
+
+    :param name: task name.
+    :param datasource_name: source database name for task datax to extract 
data.
+    :param datatarget_name: target database name for task datax to load data.
+    :param sql: sql statement for task datax to extract data form source 
database.
+    :param target_table: target table name for task datax to load data into 
target database.
+    :param datasource_type: source database type, dolphinscheduler use
     """
 
     CUSTOM_CONFIG = 0
@@ -89,6 +98,8 @@ class DataX(Task):
         datatarget_name: str,
         sql: str,
         target_table: str,
+        datasource_type: Optional[str] = None,
+        datatarget_type: Optional[str] = None,
         job_speed_byte: Optional[int] = 0,
         job_speed_record: Optional[int] = 1000,
         pre_statements: Optional[List[str]] = None,
@@ -101,7 +112,9 @@ class DataX(Task):
         self._sql = sql
         super().__init__(name, TaskType.DATAX, *args, **kwargs)
         self.custom_config = self.CUSTOM_CONFIG
+        self.datasource_type = datasource_type
         self.datasource_name = datasource_name
+        self.datatarget_type = datatarget_type
         self.datatarget_name = datatarget_name
         self.target_table = target_table
         self.job_speed_byte = job_speed_byte
@@ -111,6 +124,28 @@ class DataX(Task):
         self.xms = xms
         self.xmx = xmx
 
+    @property
+    def source_params(self) -> Dict:
+        """Get source params for datax task."""
+        datasource_task_u = Datasource.get_task_usage_4j(
+            self.datasource_name, self.datasource_type
+        )
+        return {
+            "dsType": datasource_task_u.type,
+            "dataSource": datasource_task_u.id,
+        }
+
+    @property
+    def target_params(self) -> Dict:
+        """Get target params for datax task."""
+        datasource_task_u = Datasource.get_task_usage_4j(
+            self.datatarget_name, self.datatarget_type
+        )
+        return {
+            "dtType": datasource_task_u.type,
+            "dataTarget": datasource_task_u.id,
+        }
+
     @property
     def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
Dict:
         """Override Task.task_params for datax task.
@@ -119,9 +154,6 @@ class DataX(Task):
         directly set as python property, so we Override Task.task_params here.
         """
         params = super().task_params
-        datasource = Database(self.datasource_name, "dsType", "dataSource")
-        params.update(datasource)
-
-        datatarget = Database(self.datatarget_name, "dtType", "dataTarget")
-        params.update(datatarget)
+        params.update(self.source_params)
+        params.update(self.target_params)
         return params
diff --git a/src/pydolphinscheduler/tasks/procedure.py 
b/src/pydolphinscheduler/tasks/procedure.py
index 6383e07..5a9d87b 100644
--- a/src/pydolphinscheduler/tasks/procedure.py
+++ b/src/pydolphinscheduler/tasks/procedure.py
@@ -17,11 +17,11 @@
 
 """Task procedure."""
 
-from typing import Dict
+from typing import Dict, Optional
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.database import Database
 from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.models.datasource import Datasource
 
 
 class Procedure(Task):
@@ -42,11 +42,31 @@ class Procedure(Task):
 
     _task_custom_attr = {"method"}
 
-    def __init__(self, name: str, datasource_name: str, method: str, *args, 
**kwargs):
+    def __init__(
+        self,
+        name: str,
+        datasource_name: str,
+        method: str,
+        datasource_type: Optional[str] = None,
+        *args,
+        **kwargs
+    ):
         super().__init__(name, TaskType.PROCEDURE, *args, **kwargs)
         self.datasource_name = datasource_name
+        self.datasource_type = datasource_type
         self.method = method
 
+    @property
+    def datasource(self) -> Dict:
+        """Get datasource for procedure task."""
+        datasource_task_u = Datasource.get_task_usage_4j(
+            self.datasource_name, self.datasource_type
+        )
+        return {
+            "datasource": datasource_task_u.id,
+            "type": datasource_task_u.type,
+        }
+
     @property
     def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
Dict:
         """Override Task.task_params for produce task.
@@ -55,6 +75,5 @@ class Procedure(Task):
         directly set as python property, so we Override Task.task_params here.
         """
         params = super().task_params
-        datasource = Database(self.datasource_name, "type", "datasource")
-        params.update(datasource)
+        params.update(self.datasource)
         return params
diff --git a/src/pydolphinscheduler/tasks/sql.py 
b/src/pydolphinscheduler/tasks/sql.py
index 5a6d890..a320d43 100644
--- a/src/pydolphinscheduler/tasks/sql.py
+++ b/src/pydolphinscheduler/tasks/sql.py
@@ -22,8 +22,8 @@ import re
 from typing import Dict, List, Optional, Sequence, Union
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.database import Database
 from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.models.datasource import Datasource
 
 log = logging.getLogger(__file__)
 
@@ -79,9 +79,10 @@ class Sql(Task):
         name: str,
         datasource_name: str,
         sql: str,
+        datasource_type: Optional[str] = None,
         sql_type: Optional[str] = None,
-        pre_statements: Optional[Union[str, Sequence[str]]] = None,
-        post_statements: Optional[Union[str, Sequence[str]]] = None,
+        pre_statements: Union[str, Sequence[str], None] = None,
+        post_statements: Union[str, Sequence[str], None] = None,
         display_rows: Optional[int] = 10,
         *args,
         **kwargs
@@ -90,6 +91,7 @@ class Sql(Task):
         super().__init__(name, TaskType.SQL, *args, **kwargs)
         self.param_sql_type = sql_type
         self.datasource_name = datasource_name
+        self.datasource_type = datasource_type
         self.pre_statements = self.get_stm_list(pre_statements)
         self.post_statements = self.get_stm_list(post_statements)
         self.display_rows = display_rows
@@ -134,6 +136,17 @@ class Sql(Task):
         else:
             return SqlType.SELECT
 
+    @property
+    def datasource(self) -> Dict:
+        """Get datasource for procedure sql."""
+        datasource_task_u = Datasource.get_task_usage_4j(
+            self.datasource_name, self.datasource_type
+        )
+        return {
+            "datasource": datasource_task_u.id,
+            "type": datasource_task_u.type,
+        }
+
     @property
     def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
Dict:
         """Override Task.task_params for sql task.
@@ -142,6 +155,5 @@ class Sql(Task):
         directly set as python property, so we Override Task.task_params here.
         """
         params = super().task_params
-        datasource = Database(self.datasource_name, "type", "datasource")
-        params.update(datasource)
+        params.update(self.datasource)
         return params
diff --git a/tests/core/test_database.py b/tests/core/test_database.py
deleted file mode 100644
index 1286a4a..0000000
--- a/tests/core/test_database.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""Test Database."""
-
-
-from unittest.mock import patch
-
-import pytest
-
-from pydolphinscheduler.core.database import Database
-
-TEST_DATABASE_DATASOURCE_NAME = "test_datasource"
-TEST_DATABASE_TYPE_KEY = "type"
-TEST_DATABASE_KEY = "datasource"
-
-
[email protected](
-    "expect",
-    [
-        {
-            TEST_DATABASE_TYPE_KEY: "mock_type",
-            TEST_DATABASE_KEY: 1,
-        }
-    ],
-)
-@patch(
-    "pydolphinscheduler.core.task.Task.gen_code_and_version",
-    return_value=(123, 1),
-)
-@patch(
-    "pydolphinscheduler.core.database.Database.get_database_info",
-    return_value=({"id": 1, "type": "mock_type"}),
-)
-def test_get_datasource_detail(mock_datasource, mock_code_version, expect):
-    """Test :func:`get_database_type` and :func:`get_database_id` can return 
expect value."""
-    database_info = Database(
-        TEST_DATABASE_DATASOURCE_NAME, TEST_DATABASE_TYPE_KEY, 
TEST_DATABASE_KEY
-    )
-    assert expect == database_info
diff --git a/tests/core/test_yaml_workflow.py b/tests/core/test_yaml_workflow.py
index 244965f..80e32c1 100644
--- a/tests/core/test_yaml_workflow.py
+++ b/tests/core/test_yaml_workflow.py
@@ -32,6 +32,7 @@ from pydolphinscheduler.core.yaml_workflow import (
     get_task_cls,
 )
 from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
+from pydolphinscheduler.models.datasource import TaskUsage
 from tests.testing.path import path_yaml_example
 from tests.testing.task import Task
 
@@ -182,8 +183,8 @@ def test_get_error(task_type):
     return_value=({"id": 1, "name": "test"}),
 )
 @patch(
-    "pydolphinscheduler.core.database.Database.get_database_info",
-    return_value=({"id": 1, "type": "mock_type"}),
+    "pydolphinscheduler.models.datasource.Datasource.get_task_usage_4j",
+    return_value=TaskUsage(id=1, type="MYSQL"),
 )
 @patch(
     "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
diff --git a/src/pydolphinscheduler/core/__init__.py b/tests/models/__init__.py
similarity index 70%
copy from src/pydolphinscheduler/core/__init__.py
copy to tests/models/__init__.py
index a23c768..9317c75 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/tests/models/__init__.py
@@ -15,16 +15,4 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Init pydolphinscheduler.core package."""
-
-from pydolphinscheduler.core.database import Database
-from pydolphinscheduler.core.engine import Engine
-from pydolphinscheduler.core.task import Task
-from pydolphinscheduler.core.workflow import Workflow
-
-__all__ = [
-    "Database",
-    "Engine",
-    "Workflow",
-    "Task",
-]
+"""Init modules package tests."""
diff --git a/tests/models/test_database.py b/tests/models/test_database.py
new file mode 100644
index 0000000..44e7561
--- /dev/null
+++ b/tests/models/test_database.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Database."""
+import json
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.models.connection import Connection
+from pydolphinscheduler.models.datasource import Datasource
+
+TEST_DATABASE_DATASOURCE_NAME = "test_datasource"
+TEST_DATABASE_TYPE = "mysql"
+
+TEST_CONNECTION_PARAMS = {
+    "user": "root",
+    "password": "mysql",
+    "address": "jdbc:mysql://127.0.0.1:3306",
+    "database": "test",
+    "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test",
+    "driverClassName": "com.mysql.cj.jdbc.Driver",
+    "validationQuery": "select 1",
+}
+
+TEST_CONNECTION_ARG = {
+    "host": "127.0.0.1",
+    "port": 3306,
+    "schema": "test",
+    "username": "root",
+    "password": "mysql",
+}
+
+
+datasource = Datasource(
+    id_=1,
+    type_=TEST_DATABASE_TYPE,
+    name=TEST_DATABASE_DATASOURCE_NAME,
+    connection_params=json.dumps(TEST_CONNECTION_PARAMS),
+    user_id=1,
+)
+
+
[email protected](
+    "attr, value",
+    [
+        ("connection", Connection(**TEST_CONNECTION_ARG)),
+        ("host", "127.0.0.1"),
+        ("port", 3306),
+        ("username", "root"),
+        ("password", "mysql"),
+        ("schema", "test"),
+    ],
+)
[email protected](Datasource, "get", return_value=datasource)
+def test_get_datasource_attr(mock_datasource, attr, value):
+    """Test get datasource attr."""
+    datasource_get = Datasource.get(TEST_DATABASE_DATASOURCE_NAME, 
TEST_DATABASE_TYPE)
+    assert value == getattr(datasource_get, attr)
diff --git a/tests/tasks/test_datax.py b/tests/tasks/test_datax.py
index d1143bb..b174afa 100644
--- a/tests/tasks/test_datax.py
+++ b/tests/tasks/test_datax.py
@@ -21,6 +21,7 @@ from unittest.mock import patch
 
 import pytest
 
+from pydolphinscheduler.models.datasource import Datasource, TaskUsage
 from pydolphinscheduler.resources_plugin import Local
 from pydolphinscheduler.tasks.datax import CustomDataX, DataX
 from pydolphinscheduler.utils import file
@@ -40,10 +41,7 @@ def setup_crt_first(request):
     delete_file(file_path)
 
 
-@patch(
-    "pydolphinscheduler.core.database.Database.get_database_info",
-    return_value=({"id": 1, "type": "MYSQL"}),
-)
[email protected](Datasource, "get_task_usage_4j", return_value=TaskUsage(1, 
"MYSQL"))
 def test_datax_get_define(mock_datasource):
     """Test task datax function get_define."""
     code = 123
diff --git a/tests/tasks/test_procedure.py b/tests/tasks/test_procedure.py
index 02bfaf5..1d0df55 100644
--- a/tests/tasks/test_procedure.py
+++ b/tests/tasks/test_procedure.py
@@ -21,6 +21,7 @@ from unittest.mock import patch
 
 import pytest
 
+from pydolphinscheduler.models.datasource import TaskUsage
 from pydolphinscheduler.tasks.procedure import Procedure
 
 TEST_PROCEDURE_SQL = (
@@ -56,8 +57,8 @@ TEST_PROCEDURE_DATASOURCE_NAME = "test_datasource"
     return_value=(123, 1),
 )
 @patch(
-    "pydolphinscheduler.core.database.Database.get_database_info",
-    return_value=({"id": 1, "type": "MYSQL"}),
+    "pydolphinscheduler.models.datasource.Datasource.get_task_usage_4j",
+    return_value=TaskUsage(id=1, type="MYSQL"),
 )
 def test_property_task_params(mock_datasource, mock_code_version, attr, 
expect):
     """Test task sql task property."""
@@ -70,8 +71,8 @@ def test_property_task_params(mock_datasource, 
mock_code_version, attr, expect):
     return_value=(123, 1),
 )
 @patch(
-    "pydolphinscheduler.core.database.Database.get_database_info",
-    return_value=({"id": 1, "type": "MYSQL"}),
+    "pydolphinscheduler.models.datasource.Datasource.get_task_usage_4j",
+    return_value=TaskUsage(id=1, type="MYSQL"),
 )
 def test_sql_get_define(mock_datasource, mock_code_version):
     """Test task procedure function get_define."""
diff --git a/tests/tasks/test_sql.py b/tests/tasks/test_sql.py
index 42a168d..f150820 100644
--- a/tests/tasks/test_sql.py
+++ b/tests/tasks/test_sql.py
@@ -21,6 +21,7 @@ from unittest.mock import patch
 
 import pytest
 
+from pydolphinscheduler.models.datasource import TaskUsage
 from pydolphinscheduler.resources_plugin import Local
 from pydolphinscheduler.tasks.sql import Sql, SqlType
 from pydolphinscheduler.utils import file
@@ -105,8 +106,8 @@ def test_get_stm_list(stm, expected) -> None:
     return_value=(123, 1),
 )
 @patch(
-    "pydolphinscheduler.core.database.Database.get_database_info",
-    return_value=({"id": 1, "type": "mock_type"}),
+    "pydolphinscheduler.models.datasource.Datasource.get_task_usage_4j",
+    return_value=TaskUsage(id=1, type="mock_type"),
 )
 def test_get_sql_type(
     mock_datasource, mock_code_version, sql, param_sql_type, sql_type
@@ -147,8 +148,8 @@ def test_get_sql_type(
     return_value=(123, 1),
 )
 @patch(
-    "pydolphinscheduler.core.database.Database.get_database_info",
-    return_value=({"id": 1, "type": "MYSQL"}),
+    "pydolphinscheduler.models.datasource.Datasource.get_task_usage_4j",
+    return_value=TaskUsage(id=1, type="MYSQL"),
 )
 def test_property_task_params(mock_datasource, mock_code_version, attr, 
expect):
     """Test task sql task property."""
@@ -157,8 +158,8 @@ def test_property_task_params(mock_datasource, 
mock_code_version, attr, expect):
 
 
 @patch(
-    "pydolphinscheduler.core.database.Database.get_database_info",
-    return_value=({"id": 1, "type": "MYSQL"}),
+    "pydolphinscheduler.models.datasource.Datasource.get_task_usage_4j",
+    return_value=TaskUsage(id=1, type="MYSQL"),
 )
 def test_sql_get_define(mock_datasource):
     """Test task sql function get_define."""


Reply via email to