This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch 4.0.x
in repository
https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
The following commit(s) were added to refs/heads/4.0.x by this push:
new b56ba23 feat: Add sql_delimiter for sql task before version 320 (#115)
b56ba23 is described below
commit b56ba23f792fd8115c7f3eadd32bb4030694c533
Author: Jay Chung <[email protected]>
AuthorDate: Thu Oct 12 21:39:45 2023 +0800
feat: Add sql_delimiter for sql task before version 320 (#115)
fix: #107
---
src/pydolphinscheduler/core/parameter.py | 2 +-
src/pydolphinscheduler/core/task.py | 4 ++--
src/pydolphinscheduler/models/base.py | 2 +-
src/pydolphinscheduler/tasks/sql.py | 13 +++++++++++++
tests/tasks/test_sql.py | 2 ++
5 files changed, 19 insertions(+), 4 deletions(-)
diff --git a/src/pydolphinscheduler/core/parameter.py
b/src/pydolphinscheduler/core/parameter.py
index 6bac357..15ea723 100644
--- a/src/pydolphinscheduler/core/parameter.py
+++ b/src/pydolphinscheduler/core/parameter.py
@@ -49,7 +49,7 @@ class BaseDataType:
def __eq__(self, data):
return (
- type(self) == type(data)
+ type(self) is type(data)
and self.data_type == data.data_type
and self.value == data.value
)
diff --git a/src/pydolphinscheduler/core/task.py
b/src/pydolphinscheduler/core/task.py
index b0f27e4..ef1c52d 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -260,13 +260,13 @@ class Task(Base):
"""Get task define attribute `resource_list`."""
resources = set()
for res in self._resource_list:
- if type(res) == str:
+ if isinstance(res, str):
resources.add(
Resource(
name=res, user_name=self.user_name
).get_fullname_from_database()
)
- elif type(res) == dict and ResourceKey.NAME in res:
+ elif isinstance(res, dict) and ResourceKey.NAME in res:
warnings.warn(
"""`resource_list` should be defined using List[str] with
resource paths,
the use of ids to define resources will be remove in
version 3.2.0.
diff --git a/src/pydolphinscheduler/models/base.py
b/src/pydolphinscheduler/models/base.py
index 2647714..007edec 100644
--- a/src/pydolphinscheduler/models/base.py
+++ b/src/pydolphinscheduler/models/base.py
@@ -43,7 +43,7 @@ class Base:
return f'<{type(self).__name__}: name="{self.name}">'
def __eq__(self, other):
- return type(self) == type(other) and all(
+ return type(self) is type(other) and all(
getattr(self, a, None) == getattr(other, a, None) for a in
self._KEY_ATTR
)
diff --git a/src/pydolphinscheduler/tasks/sql.py
b/src/pydolphinscheduler/tasks/sql.py
index a320d43..343caa2 100644
--- a/src/pydolphinscheduler/tasks/sql.py
+++ b/src/pydolphinscheduler/tasks/sql.py
@@ -19,6 +19,7 @@
import logging
import re
+import warnings
from typing import Dict, List, Optional, Sequence, Union
from pydolphinscheduler.constants import TaskType
@@ -58,6 +59,8 @@ class Sql(Task):
detected according to sql statement using
:func:`pydolphinscheduler.tasks.sql.Sql.sql_type`, and you
can also set it manually. by ``SqlType.SELECT`` for query statement or
``SqlType.NOT_SELECT`` for not
query statement.
+ :param sql_delimiter: SQL delimiter to split one sql statement into
multiple statements, ONLY support in
+ ``sql_type=SqlType.NOT_SELECT``, default is None.
:param pre_statements: SQL statements to be executed before the main SQL
statement.
:param post_statements: SQL statements to be executed after the main SQL
statement.
:param display_rows: The number of record rows number to be displayed in
the SQL task log, default is 10.
@@ -66,6 +69,7 @@ class Sql(Task):
_task_custom_attr = {
"sql",
"sql_type",
+ "segment_separator",
"pre_statements",
"post_statements",
"display_rows",
@@ -81,6 +85,7 @@ class Sql(Task):
sql: str,
datasource_type: Optional[str] = None,
sql_type: Optional[str] = None,
+ sql_delimiter: Optional[str] = None,
pre_statements: Union[str, Sequence[str], None] = None,
post_statements: Union[str, Sequence[str], None] = None,
display_rows: Optional[int] = 10,
@@ -90,6 +95,14 @@ class Sql(Task):
self._sql = sql
super().__init__(name, TaskType.SQL, *args, **kwargs)
self.param_sql_type = sql_type
+ if sql_type == SqlType.SELECT and sql_delimiter:
+ warnings.warn(
+ "Parameter `sql_delimiter` is only supported in
`sql_type=SqlType.NO_SELECT`, but current "
+ "sql_type is `sql_type=SqlType.SELECT`, so `sql_delimiter`
will be ignored.",
+ UserWarning,
+ stacklevel=2,
+ )
+ self.segment_separator = sql_delimiter or ""
self.datasource_name = datasource_name
self.datasource_type = datasource_type
self.pre_statements = self.get_stm_list(pre_statements)
diff --git a/tests/tasks/test_sql.py b/tests/tasks/test_sql.py
index f150820..06f8d2c 100644
--- a/tests/tasks/test_sql.py
+++ b/tests/tasks/test_sql.py
@@ -131,6 +131,7 @@ def test_get_sql_type(
"type": "MYSQL",
"datasource": 1,
"sqlType": "0",
+ "segmentSeparator": "",
"preStatements": [],
"postStatements": [],
"displayRows": 10,
@@ -177,6 +178,7 @@ def test_sql_get_define(mock_datasource):
"preStatements": [],
"postStatements": [],
"localParams": [],
+ "segmentSeparator": "",
"resourceList": [],
"dependence": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},