This is an automated email from the ASF dual-hosted git repository. zhongjiajie pushed a commit to branch commercial in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
commit caa4133a838258430c655a2acd07b24a6612f4c4 Author: Jay Chung <[email protected]> AuthorDate: Mon Sep 18 17:18:37 2023 +0800 get source code of 4.0.2a11 --- src/pydolphinscheduler/__init__.py | 2 +- src/pydolphinscheduler/core/workflow.py | 49 ++++------------ src/pydolphinscheduler/default_config.yaml | 4 +- .../examples/task_datax_example.py | 3 +- .../{tutorial.py => task_python_example.py} | 65 +++++++++++++++++----- .../{tutorial.py => task_shell_example.py} | 51 +++++++++++------ src/pydolphinscheduler/examples/tutorial.py | 4 +- src/pydolphinscheduler/java_gateway.py | 10 ++-- src/pydolphinscheduler/models/datasource.py | 57 +++++++------------ src/pydolphinscheduler/tasks/datax.py | 18 ++---- src/pydolphinscheduler/tasks/mixin/__init__.py | 0 .../tasks/mixin/datasource_list_mixin.py | 28 ++++++++++ .../tasks/mixin/remote_connection_mixin.py | 29 ++++++++++ src/pydolphinscheduler/tasks/procedure.py | 17 +----- src/pydolphinscheduler/tasks/python.py | 29 ++++++++-- src/pydolphinscheduler/tasks/shell.py | 31 ++++++++++- src/pydolphinscheduler/tasks/sql.py | 6 +- 17 files changed, 244 insertions(+), 159 deletions(-) diff --git a/src/pydolphinscheduler/__init__.py b/src/pydolphinscheduler/__init__.py index 17d52ad..7419152 100644 --- a/src/pydolphinscheduler/__init__.py +++ b/src/pydolphinscheduler/__init__.py @@ -17,4 +17,4 @@ """Init root of pydolphinscheduler.""" -__version__ = "v4.0.2a8" +__version__ = "4.0.2a11" diff --git a/src/pydolphinscheduler/core/workflow.py b/src/pydolphinscheduler/core/workflow.py index 582faa6..e0e00b8 100644 --- a/src/pydolphinscheduler/core/workflow.py +++ b/src/pydolphinscheduler/core/workflow.py @@ -22,13 +22,13 @@ from datetime import datetime from typing import Any, Dict, List, Optional, Set, Union from pydolphinscheduler import configuration -from pydolphinscheduler.constants import Symbol, TaskType +from pydolphinscheduler.constants import TaskType from pydolphinscheduler.core.resource import Resource from pydolphinscheduler.core.resource_plugin import ResourcePlugin from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException from pydolphinscheduler.java_gateway import gateway from pydolphinscheduler.models import Base, Project, User -from pydolphinscheduler.utils.date import MAX_DATETIME, conv_from_str, conv_to_schedule +from pydolphinscheduler.utils.date import conv_from_str class WorkflowContext: @@ -119,8 +119,7 @@ class Workflow(Base): self, name: str, description: Optional[str] = None, - schedule: Optional[str] = None, - online_schedule: Optional[bool] = None, + timing_name: Optional[str] = None, start_time: Optional[Union[str, datetime]] = None, end_time: Optional[Union[str, datetime]] = None, timezone: Optional[str] = configuration.WORKFLOW_TIME_ZONE, @@ -139,22 +138,9 @@ class Workflow(Base): **kwargs, ): super().__init__(name, description) - self.schedule = schedule.strip() if schedule else schedule - if ( - self.schedule - and self.schedule.count(Symbol.BLANK) != self._EXPECT_SCHEDULE_CHAR_NUM - 1 - ): - raise PyDSParamException( - "Invlaid parameter schedule, expect crontab char is %d but get %s", - self._EXPECT_SCHEDULE_CHAR_NUM, - schedule, - ) + self.timing_name = timing_name.strip() if timing_name else timing_name # handle workflow schedule state according to init value - if self.schedule and online_schedule is None: - self.online_schedule = True - else: - self.online_schedule = online_schedule or False self._start_time = start_time self._end_time = end_time self.timezone = timezone @@ -190,6 +176,11 @@ class Workflow(Base): self._workflow_code = None self.resource_list = resource_list or [] + # do not thing but for air2phin migrate + self.schedule = None + if 'schedule' in kwargs or 'schedule_interval' in kwargs: + self.schedule = kwargs.get('schedule') or kwargs.get('schedule_interval') + def __enter__(self) -> "Workflow": WorkflowContext.set(self) return self @@ -306,25 +297,6 @@ class Workflow(Base): self._handle_root_relation() return [tr.get_define() for tr in self._task_relations] - @property - def schedule_json(self) -> Optional[Dict]: - """Get schedule parameter json object. This is requests from java gateway interface.""" - if not self.schedule: - return None - else: - start_time = conv_to_schedule( - self.start_time if self.start_time else datetime.now() - ) - end_time = conv_to_schedule( - self.end_time if self.end_time else MAX_DATETIME - ) - return { - "startTime": start_time, - "endTime": end_time, - "crontab": self.schedule, - "timezoneId": self.timezone, - } - @property def task_list(self) -> List["Task"]: # noqa: F821 """Return list of tasks objects.""" @@ -451,8 +423,7 @@ class Workflow(Base): # TODO add serialization function json.dumps(self.task_relation_json), json.dumps(self.task_definition_json), - json.dumps(self.schedule_json) if self.schedule_json else None, - self.online_schedule, + self.timing_name, None, ) return self._workflow_code diff --git a/src/pydolphinscheduler/default_config.yaml b/src/pydolphinscheduler/default_config.yaml index a382413..e75cec3 100644 --- a/src/pydolphinscheduler/default_config.yaml +++ b/src/pydolphinscheduler/default_config.yaml @@ -47,13 +47,13 @@ default: state: 1 # Default value for dolphinscheduler's workflow object workflow: - project: project-pydolphin + project: pydolphinscheduler user: admin queue: queuePythonGateway worker_group: default # Release state of workflow, default value is ``online`` which mean setting workflow online when it submits # to Java gateway, if you want to set workflow offline set its value to ``offline`` - release_state: online + release_state: offline time_zone: Asia/Shanghai # Warning type of the workflow, default value is ``NONE`` mean do not warn user in any cases of workflow state, # change to ``FAILURE`` if you want to warn users when workflow failed. All available enum value are diff --git a/src/pydolphinscheduler/examples/task_datax_example.py b/src/pydolphinscheduler/examples/task_datax_example.py index 6fdf779..33511ab 100644 --- a/src/pydolphinscheduler/examples/task_datax_example.py +++ b/src/pydolphinscheduler/examples/task_datax_example.py @@ -74,6 +74,7 @@ JSON_TEMPLATE = { with Workflow( name="task_datax_example", + release_state="offline", ) as workflow: # This task synchronizes the data in `t_ds_project` # of `first_mysql` database to `target_project` of `second_mysql` database. @@ -89,6 +90,6 @@ with Workflow( # You can custom json_template of datax to sync data. This task create a new # datax job same as task1, transfer record from `first_mysql` to `second_mysql` - task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE)) + task2 = CustomDataX(name="task_custom_datax", config=JSON_TEMPLATE) workflow.run() # [end workflow_declare] diff --git a/src/pydolphinscheduler/examples/tutorial.py b/src/pydolphinscheduler/examples/task_python_example.py similarity index 64% copy from src/pydolphinscheduler/examples/tutorial.py copy to src/pydolphinscheduler/examples/task_python_example.py index 2814caf..681867b 100644 --- a/src/pydolphinscheduler/examples/tutorial.py +++ b/src/pydolphinscheduler/examples/task_python_example.py @@ -30,37 +30,74 @@ task_parent --> --> task_union it will instantiate and run all the task it have. """ +import time +from datetime import datetime + # [start tutorial] # [start package_import] # Import Workflow object to define your workflow attributes from pydolphinscheduler.core.workflow import Workflow # Import task Shell object cause we would create some shell tasks later -from pydolphinscheduler.tasks.shell import Shell +from pydolphinscheduler.tasks.func_wrap import task +from pydolphinscheduler.tasks.python import Python # [end package_import] +scope_global = "global-var" + + +# [start task_declare] +@task +def print_something(): + """First task in this workflow.""" + print("hello python function wrap task") + + +@task +def depend_import(): + """Depend on import module.""" + time.sleep(2) + + +definition_str = """ +from datetime import datetime + +def foo(): + print("hello world in function foo") + print(f"Current time is {datetime.now()}") +""" + + +def bar(): + print("hello world in function bar") + print(f"Current time is {datetime.now()}") + + +# [end task_declare] + + # [start workflow_declare] with Workflow( - name="tutorial", + name="task_python", schedule="0 0 0 * * ? *", start_time="2021-01-01", - online_schedule=False, - release_state="offline", ) as workflow: # [end workflow_declare] - # [start task_declare] - task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler") - task_child_one = Shell(name="task_child_one", command="echo 'child one'") - task_child_two = Shell(name="task_child_two", command="echo 'child two'") - task_union = Shell(name="task_union", command="echo union") - # [end task_declare] - # [start task_relation_declare] - task_group = [task_child_one, task_child_two] - task_parent.set_downstream(task_group) + python_str = Python( + name="python_str", + definition=definition_str, + ) - task_union << task_group + python_func = Python( + name="python_func", + definition=bar, + datasource_name=["mysql-meta"], + ) + + # [start task_relation_declare] + print_something() >> depend_import() >> python_str >> python_func # [end task_relation_declare] # [start submit_or_run] diff --git a/src/pydolphinscheduler/examples/tutorial.py b/src/pydolphinscheduler/examples/task_shell_example.py similarity index 65% copy from src/pydolphinscheduler/examples/tutorial.py copy to src/pydolphinscheduler/examples/task_shell_example.py index 2814caf..7eda184 100644 --- a/src/pydolphinscheduler/examples/tutorial.py +++ b/src/pydolphinscheduler/examples/task_shell_example.py @@ -38,32 +38,47 @@ from pydolphinscheduler.core.workflow import Workflow # Import task Shell object cause we would create some shell tasks later from pydolphinscheduler.tasks.shell import Shell -# [end package_import] - # [start workflow_declare] with Workflow( - name="tutorial", + name="task_shell", schedule="0 0 0 * * ? *", start_time="2021-01-01", - online_schedule=False, - release_state="offline", ) as workflow: # [end workflow_declare] - # [start task_declare] - task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler") - task_child_one = Shell(name="task_child_one", command="echo 'child one'") - task_child_two = Shell(name="task_child_two", command="echo 'child two'") - task_union = Shell(name="task_union", command="echo union") - # [end task_declare] + simple = Shell(name="simple", command="echo simple") + + datasource_source = Shell( + name="datasource_source", + command=""" + echo "${getConnectionHost('mysql-meta')}" + echo "${getConnectionUsername('mysql-target')}" + """, + datasource_name=[ + "mysql-meta", + "mysql-target", + ], + ) + + remote_connection = Shell( + name="remote_connection", + command="ls /tmp", + remote_connection="remote-ssh-ws3", + ) - # [start task_relation_declare] - task_group = [task_child_one, task_child_two] - task_parent.set_downstream(task_group) + mixin = Shell( + name="mixin", + command=""" + echo "${getConnectionHost('mysql-meta')}" + echo "${getConnectionUsername('mysql-target')}" + """, + remote_connection="remote-ssh-ws3", + datasource_name=[ + "mysql-meta", + "mysql-target", + ], + ) - task_union << task_group - # [end task_relation_declare] + simple >> datasource_source >> remote_connection >> mixin # [start submit_or_run] workflow.submit() - # [end submit_or_run] -# [end tutorial] diff --git a/src/pydolphinscheduler/examples/tutorial.py b/src/pydolphinscheduler/examples/tutorial.py index 2814caf..11b3d3d 100644 --- a/src/pydolphinscheduler/examples/tutorial.py +++ b/src/pydolphinscheduler/examples/tutorial.py @@ -43,10 +43,8 @@ from pydolphinscheduler.tasks.shell import Shell # [start workflow_declare] with Workflow( name="tutorial", - schedule="0 0 0 * * ? *", start_time="2021-01-01", - online_schedule=False, - release_state="offline", + release_state="online", ) as workflow: # [end workflow_declare] # [start task_declare] diff --git a/src/pydolphinscheduler/java_gateway.py b/src/pydolphinscheduler/java_gateway.py index 429f1ac..4b1476e 100644 --- a/src/pydolphinscheduler/java_gateway.py +++ b/src/pydolphinscheduler/java_gateway.py @@ -88,7 +88,7 @@ class GatewayEntryPoint: """Get the java gateway version, expected to be equal with pydolphinscheduler.""" return self.gateway.entry_point.getGatewayVersion() - def get_datasource(self, name: str, type: Optional[str] = None): + def get_datasource(self, name: str): """Get single datasource by java gateway. Will use datasource_name to query database, and then filter by datasource_type if provided. @@ -96,7 +96,7 @@ class GatewayEntryPoint: :param name: datasource name of the datasource to be queried :param type: datasource type of the datasource, only used to filter the result. """ - return self.gateway.entry_point.getDatasource(name, type) + return self.gateway.entry_point.getDatasource(name) def get_resources_file_info(self, program_type: str, main_package: str): """Get resources file info through java gateway.""" @@ -251,8 +251,7 @@ class GatewayEntryPoint: release_state: int, task_relation_json: str, task_definition_json: str, - schedule: Optional[str] = None, - online_schedule: Optional[bool] = None, + timingName: Optional[str] = None, other_params_json: Optional[str] = None, ): """Create or update workflow through java gateway.""" @@ -262,8 +261,7 @@ class GatewayEntryPoint: name, description, global_params, - schedule, - online_schedule, + timingName, warning_type, warning_group_id, timeout, diff --git a/src/pydolphinscheduler/models/datasource.py b/src/pydolphinscheduler/models/datasource.py index 7c70902..3b55f21 100644 --- a/src/pydolphinscheduler/models/datasource.py +++ b/src/pydolphinscheduler/models/datasource.py @@ -20,8 +20,6 @@ import json import re from typing import NamedTuple, Optional -from py4j.java_gateway import JavaObject - from pydolphinscheduler.java_gateway import gateway from pydolphinscheduler.models.connection import Connection from pydolphinscheduler.models.meta import ModelMeta @@ -32,6 +30,7 @@ class TaskUsage(NamedTuple): id: str type: str + name: str class Datasource(metaclass=ModelMeta): @@ -91,52 +90,46 @@ class Datasource(metaclass=ModelMeta): def __init__( self, - st_db_type: str, - name: str, - connection_params: str, - user_id: Optional[int] = None, + datasource_name: str, + plugin_name: str, + plugin_version: str, + datasource_config: str = None, id_: Optional[int] = None, - note: Optional[str] = None, + description: Optional[str] = None, ): self.id = id_ - self.name = name - self.note = note - # TODO try to handle type_ in metaclass - self.st_db_type: JavaObject = st_db_type - self.user_id = user_id - self.connection_params = connection_params + self.datasource_name = datasource_name + self.plugin_name = plugin_name + self.plugin_version = plugin_version + self.datasource_config = datasource_config + self.description = description @classmethod - def get( - cls, datasource_name: str, datasource_type: Optional[str] = None - ) -> "Datasource": + def get(cls, datasource_name: str) -> "Datasource": """Get single datasource. :param datasource_name: datasource name :param datasource_type: datasource type, if not provided, will get datasource by name only """ - datasource = gateway.get_datasource(datasource_name, datasource_type) + datasource = gateway.get_datasource(datasource_name) if datasource is None: - raise ValueError( - f"Datasource with name: {datasource_name} and type: {datasource_type} not found." - ) + raise ValueError(f"Datasource with name: {datasource_name} not found.") return datasource @classmethod - def get_task_usage_4j( - cls, datasource_name: str, datasource_type: Optional[str] = None - ) -> TaskUsage: + def get_task_usage_4j(cls, datasource_name: str) -> TaskUsage: """Get the necessary information of datasource for task usage in web UI.""" - datasource: "Datasource" = cls.get(datasource_name, datasource_type) + datasource: "Datasource" = cls.get(datasource_name) return TaskUsage( id=str(datasource.id), - type=datasource.type, + type=datasource.plugin_name, + name=datasource.datasource_name, ) @property def connection(self) -> Connection: - """Parse dolphinscheduler connection_params to Connection.""" - data = json.loads(self.connection_params) + """Parse dolphinscheduler datasource_config to Connection.""" + data = json.loads(self.datasource_config) pattern_match = self._PATTERN.match( data.get("jdbcUrl", data.get("url", None)) @@ -150,16 +143,6 @@ class Datasource(metaclass=ModelMeta): password=data.get("password", None), ) - @property - def type(self) -> str: - """Property datasource type.""" - return self.st_db_type.getDescp() - - @property - def type_code(self) -> str: - """Property datasource type.""" - return self.st_db_type.getCode() - @property def host(self) -> str: """Property datasource host, such as ``127.0.0.1`` or ``localhosts``.""" diff --git a/src/pydolphinscheduler/tasks/datax.py b/src/pydolphinscheduler/tasks/datax.py index 1dfa89c..1cb438c 100644 --- a/src/pydolphinscheduler/tasks/datax.py +++ b/src/pydolphinscheduler/tasks/datax.py @@ -16,7 +16,7 @@ # under the License. """Task datax.""" - +import json as json_pkg from typing import Dict, List, Optional from pydolphinscheduler.constants import TaskType @@ -40,13 +40,13 @@ class CustomDataX(Task): def __init__( self, name: str, - json: str, + config: dict, xms: Optional[int] = 1, xmx: Optional[int] = 1, *args, **kwargs ): - self._json = json + self._json = json_pkg.dumps(config, indent=2) super().__init__(name, TaskType.DATAX, *args, **kwargs) self.custom_config = self.CUSTOM_CONFIG self.xms = xms @@ -98,8 +98,6 @@ class DataX(Task): datatarget_name: str, sql: str, target_table: str, - datasource_type: Optional[str] = None, - datatarget_type: Optional[str] = None, job_speed_byte: Optional[int] = 0, job_speed_record: Optional[int] = 1000, pre_statements: Optional[List[str]] = None, @@ -112,9 +110,7 @@ class DataX(Task): self._sql = sql super().__init__(name, TaskType.DATAX, *args, **kwargs) self.custom_config = self.CUSTOM_CONFIG - self.datasource_type = datasource_type self.datasource_name = datasource_name - self.datatarget_type = datatarget_type self.datatarget_name = datatarget_name self.target_table = target_table self.job_speed_byte = job_speed_byte @@ -127,9 +123,7 @@ class DataX(Task): @property def source_params(self) -> Dict: """Get source params for datax task.""" - datasource_task_u = Datasource.get_task_usage_4j( - self.datasource_name, self.datasource_type - ) + datasource_task_u = Datasource.get_task_usage_4j(self.datasource_name) return { "dsType": datasource_task_u.type, "dataSource": datasource_task_u.id, @@ -138,9 +132,7 @@ class DataX(Task): @property def target_params(self) -> Dict: """Get target params for datax task.""" - datasource_task_u = Datasource.get_task_usage_4j( - self.datatarget_name, self.datatarget_type - ) + datasource_task_u = Datasource.get_task_usage_4j(self.datatarget_name) return { "dtType": datasource_task_u.type, "dataTarget": datasource_task_u.id, diff --git a/src/pydolphinscheduler/tasks/mixin/__init__.py b/src/pydolphinscheduler/tasks/mixin/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pydolphinscheduler/tasks/mixin/datasource_list_mixin.py b/src/pydolphinscheduler/tasks/mixin/datasource_list_mixin.py new file mode 100644 index 0000000..d94c0f3 --- /dev/null +++ b/src/pydolphinscheduler/tasks/mixin/datasource_list_mixin.py @@ -0,0 +1,28 @@ +from typing import Dict, List, NamedTuple, Optional + +from pydolphinscheduler.models.datasource import Datasource + + +class DatasourceType(NamedTuple): + type: str + datasource: str + + +class DatasourceList(NamedTuple): + dataSourceList: List[Dict] + + +class DatasourceListMixin: + datasource_name: Optional[List[str]] + + def get_datasource(self) -> Dict: + datasource_list_detail = [] + for name in self.datasource_name: + datasource_task_u = Datasource.get_task_usage_4j(name) + datasource_itme = DatasourceType( + type=datasource_task_u.type, datasource=datasource_task_u.id + ) + datasource_list_detail.append(datasource_itme._asdict()) + + datasource_list = DatasourceList(dataSourceList=datasource_list_detail) + return datasource_list._asdict() diff --git a/src/pydolphinscheduler/tasks/mixin/remote_connection_mixin.py b/src/pydolphinscheduler/tasks/mixin/remote_connection_mixin.py new file mode 100644 index 0000000..3890a47 --- /dev/null +++ b/src/pydolphinscheduler/tasks/mixin/remote_connection_mixin.py @@ -0,0 +1,29 @@ +from typing import Dict, NamedTuple, Optional + +from pydolphinscheduler.models.datasource import Datasource + + +class RemoteConnectionType(NamedTuple): + open: bool + datasourceType: str + datasourceId: str + + +class RemoteConnectionList(NamedTuple): + remoteConnection: Dict + + +class RemoteConnectionMixin: + remote_connection: Optional[str] + + def get_remote_connection(self) -> Dict: + if self.remote_connection is not None: + connection = Datasource.get_task_usage_4j(self.remote_connection) + connection_itme = RemoteConnectionType( + open=True, datasourceType=connection.type, datasourceId=connection.id + ) + + datasource_list = RemoteConnectionList( + remoteConnection=connection_itme._asdict() + ) + return datasource_list._asdict() diff --git a/src/pydolphinscheduler/tasks/procedure.py b/src/pydolphinscheduler/tasks/procedure.py index 5a9d87b..7620f4d 100644 --- a/src/pydolphinscheduler/tasks/procedure.py +++ b/src/pydolphinscheduler/tasks/procedure.py @@ -17,7 +17,7 @@ """Task procedure.""" -from typing import Dict, Optional +from typing import Dict from pydolphinscheduler.constants import TaskType from pydolphinscheduler.core.task import Task @@ -42,26 +42,15 @@ class Procedure(Task): _task_custom_attr = {"method"} - def __init__( - self, - name: str, - datasource_name: str, - method: str, - datasource_type: Optional[str] = None, - *args, - **kwargs - ): + def __init__(self, name: str, datasource_name: str, method: str, *args, **kwargs): super().__init__(name, TaskType.PROCEDURE, *args, **kwargs) self.datasource_name = datasource_name - self.datasource_type = datasource_type self.method = method @property def datasource(self) -> Dict: """Get datasource for procedure task.""" - datasource_task_u = Datasource.get_task_usage_4j( - self.datasource_name, self.datasource_type - ) + datasource_task_u = Datasource.get_task_usage_4j(self.datasource_name) return { "datasource": datasource_task_u.id, "type": datasource_task_u.type, diff --git a/src/pydolphinscheduler/tasks/python.py b/src/pydolphinscheduler/tasks/python.py index c1b2558..61c31f0 100644 --- a/src/pydolphinscheduler/tasks/python.py +++ b/src/pydolphinscheduler/tasks/python.py @@ -21,18 +21,19 @@ import logging import re import types from pathlib import Path -from typing import Union +from typing import Dict, List, Optional, Union from stmdency.extractor import Extractor from pydolphinscheduler.constants import TaskType from pydolphinscheduler.core.task import Task from pydolphinscheduler.exceptions import PyDSParamException +from pydolphinscheduler.tasks.mixin.datasource_list_mixin import DatasourceListMixin log = logging.getLogger(__file__) -class Python(Task): +class Python(Task, DatasourceListMixin): """Task Python object, declare behavior for Python task to dolphinscheduler. Python task support two types of parameters for :param:``definition``, and here is an example: @@ -57,15 +58,23 @@ class Python(Task): want to execute. """ - _task_custom_attr = {"raw_script"} + _task_custom_attr = { + "raw_script", + } ext: set = {".py"} ext_attr: Union[str, types.FunctionType] = "_definition" def __init__( - self, name: str, definition: Union[str, types.FunctionType], *args, **kwargs + self, + name: str, + definition: Union[str, types.FunctionType], + datasource_name: Optional[List[str]] = None, + *args, + **kwargs, ): self._definition = definition + self.datasource_name = datasource_name super().__init__(name, TaskType.PYTHON, *args, **kwargs) def _build_exe_str(self) -> str: @@ -103,3 +112,15 @@ class Python(Task): "Parameter definition do not support % for now.", type(getattr(self, "definition")), ) + + @property + def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict: + """Override Task.task_params for sql task. + + sql task have some specials attribute for task_params, and is odd if we + directly set as python property, so we Override Task.task_params here. + """ + params = super().task_params + if self.datasource_name: + params.update(self.get_datasource()) + return params diff --git a/src/pydolphinscheduler/tasks/shell.py b/src/pydolphinscheduler/tasks/shell.py index 36ec4e8..110cbe8 100644 --- a/src/pydolphinscheduler/tasks/shell.py +++ b/src/pydolphinscheduler/tasks/shell.py @@ -16,12 +16,15 @@ # under the License. """Task shell.""" +from typing import Dict, List, Optional from pydolphinscheduler.constants import TaskType from pydolphinscheduler.core.task import Task +from pydolphinscheduler.tasks.mixin.datasource_list_mixin import DatasourceListMixin +from pydolphinscheduler.tasks.mixin.remote_connection_mixin import RemoteConnectionMixin -class Shell(Task): +class Shell(Task, DatasourceListMixin, RemoteConnectionMixin): """Task shell object, declare behavior for shell task to dolphinscheduler. :param name: A unique, meaningful string for the shell task. @@ -53,6 +56,30 @@ class Shell(Task): ext: set = {".sh", ".zsh"} ext_attr: str = "_raw_script" - def __init__(self, name: str, command: str, *args, **kwargs): + def __init__( + self, + name: str, + command: str, + datasource_name: Optional[List[str]] = None, + remote_connection: Optional[str] = None, + *args, + **kwargs + ): self._raw_script = command + self.datasource_name = datasource_name + self.remote_connection = remote_connection super().__init__(name, TaskType.SHELL, *args, **kwargs) + + @property + def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict: + """Override Task.task_params for sql task. + + sql task have some specials attribute for task_params, and is odd if we + directly set as python property, so we Override Task.task_params here. + """ + params = super().task_params + if self.datasource_name: + params.update(self.get_datasource()) + if self.remote_connection: + params.update(self.get_remote_connection()) + return params diff --git a/src/pydolphinscheduler/tasks/sql.py b/src/pydolphinscheduler/tasks/sql.py index a320d43..3d7f4d8 100644 --- a/src/pydolphinscheduler/tasks/sql.py +++ b/src/pydolphinscheduler/tasks/sql.py @@ -79,7 +79,6 @@ class Sql(Task): name: str, datasource_name: str, sql: str, - datasource_type: Optional[str] = None, sql_type: Optional[str] = None, pre_statements: Union[str, Sequence[str], None] = None, post_statements: Union[str, Sequence[str], None] = None, @@ -91,7 +90,6 @@ class Sql(Task): super().__init__(name, TaskType.SQL, *args, **kwargs) self.param_sql_type = sql_type self.datasource_name = datasource_name - self.datasource_type = datasource_type self.pre_statements = self.get_stm_list(pre_statements) self.post_statements = self.get_stm_list(post_statements) self.display_rows = display_rows @@ -139,9 +137,7 @@ class Sql(Task): @property def datasource(self) -> Dict: """Get datasource for procedure sql.""" - datasource_task_u = Datasource.get_task_usage_4j( - self.datasource_name, self.datasource_type - ) + datasource_task_u = Datasource.get_task_usage_4j(self.datasource_name) return { "datasource": datasource_task_u.id, "type": datasource_task_u.type,
