This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch commercial
in repository 
https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git

commit caa4133a838258430c655a2acd07b24a6612f4c4
Author: Jay Chung <[email protected]>
AuthorDate: Mon Sep 18 17:18:37 2023 +0800

    get source code of 4.0.2a11
---
 src/pydolphinscheduler/__init__.py                 |  2 +-
 src/pydolphinscheduler/core/workflow.py            | 49 ++++------------
 src/pydolphinscheduler/default_config.yaml         |  4 +-
 .../examples/task_datax_example.py                 |  3 +-
 .../{tutorial.py => task_python_example.py}        | 65 +++++++++++++++++-----
 .../{tutorial.py => task_shell_example.py}         | 51 +++++++++++------
 src/pydolphinscheduler/examples/tutorial.py        |  4 +-
 src/pydolphinscheduler/java_gateway.py             | 10 ++--
 src/pydolphinscheduler/models/datasource.py        | 57 +++++++------------
 src/pydolphinscheduler/tasks/datax.py              | 18 ++----
 src/pydolphinscheduler/tasks/mixin/__init__.py     |  0
 .../tasks/mixin/datasource_list_mixin.py           | 28 ++++++++++
 .../tasks/mixin/remote_connection_mixin.py         | 29 ++++++++++
 src/pydolphinscheduler/tasks/procedure.py          | 17 +-----
 src/pydolphinscheduler/tasks/python.py             | 29 ++++++++--
 src/pydolphinscheduler/tasks/shell.py              | 31 ++++++++++-
 src/pydolphinscheduler/tasks/sql.py                |  6 +-
 17 files changed, 244 insertions(+), 159 deletions(-)

diff --git a/src/pydolphinscheduler/__init__.py 
b/src/pydolphinscheduler/__init__.py
index 17d52ad..7419152 100644
--- a/src/pydolphinscheduler/__init__.py
+++ b/src/pydolphinscheduler/__init__.py
@@ -17,4 +17,4 @@
 
 """Init root of pydolphinscheduler."""
 
-__version__ = "v4.0.2a8"
+__version__ = "4.0.2a11"
diff --git a/src/pydolphinscheduler/core/workflow.py 
b/src/pydolphinscheduler/core/workflow.py
index 582faa6..e0e00b8 100644
--- a/src/pydolphinscheduler/core/workflow.py
+++ b/src/pydolphinscheduler/core/workflow.py
@@ -22,13 +22,13 @@ from datetime import datetime
 from typing import Any, Dict, List, Optional, Set, Union
 
 from pydolphinscheduler import configuration
-from pydolphinscheduler.constants import Symbol, TaskType
+from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.resource import Resource
 from pydolphinscheduler.core.resource_plugin import ResourcePlugin
 from pydolphinscheduler.exceptions import PyDSParamException, 
PyDSTaskNoFoundException
 from pydolphinscheduler.java_gateway import gateway
 from pydolphinscheduler.models import Base, Project, User
-from pydolphinscheduler.utils.date import MAX_DATETIME, conv_from_str, 
conv_to_schedule
+from pydolphinscheduler.utils.date import conv_from_str
 
 
 class WorkflowContext:
@@ -119,8 +119,7 @@ class Workflow(Base):
         self,
         name: str,
         description: Optional[str] = None,
-        schedule: Optional[str] = None,
-        online_schedule: Optional[bool] = None,
+        timing_name: Optional[str] = None,
         start_time: Optional[Union[str, datetime]] = None,
         end_time: Optional[Union[str, datetime]] = None,
         timezone: Optional[str] = configuration.WORKFLOW_TIME_ZONE,
@@ -139,22 +138,9 @@ class Workflow(Base):
         **kwargs,
     ):
         super().__init__(name, description)
-        self.schedule = schedule.strip() if schedule else schedule
-        if (
-            self.schedule
-            and self.schedule.count(Symbol.BLANK) != 
self._EXPECT_SCHEDULE_CHAR_NUM - 1
-        ):
-            raise PyDSParamException(
-                "Invlaid parameter schedule, expect crontab char is %d but get 
%s",
-                self._EXPECT_SCHEDULE_CHAR_NUM,
-                schedule,
-            )
+        self.timing_name = timing_name.strip() if timing_name else timing_name
 
         #  handle workflow schedule state according to init value
-        if self.schedule and online_schedule is None:
-            self.online_schedule = True
-        else:
-            self.online_schedule = online_schedule or False
         self._start_time = start_time
         self._end_time = end_time
         self.timezone = timezone
@@ -190,6 +176,11 @@ class Workflow(Base):
         self._workflow_code = None
         self.resource_list = resource_list or []
 
+        # do not thing but for air2phin migrate
+        self.schedule = None
+        if 'schedule' in kwargs or 'schedule_interval' in kwargs:
+            self.schedule = kwargs.get('schedule') or 
kwargs.get('schedule_interval')
+
     def __enter__(self) -> "Workflow":
         WorkflowContext.set(self)
         return self
@@ -306,25 +297,6 @@ class Workflow(Base):
             self._handle_root_relation()
             return [tr.get_define() for tr in self._task_relations]
 
-    @property
-    def schedule_json(self) -> Optional[Dict]:
-        """Get schedule parameter json object. This is requests from java 
gateway interface."""
-        if not self.schedule:
-            return None
-        else:
-            start_time = conv_to_schedule(
-                self.start_time if self.start_time else datetime.now()
-            )
-            end_time = conv_to_schedule(
-                self.end_time if self.end_time else MAX_DATETIME
-            )
-            return {
-                "startTime": start_time,
-                "endTime": end_time,
-                "crontab": self.schedule,
-                "timezoneId": self.timezone,
-            }
-
     @property
     def task_list(self) -> List["Task"]:  # noqa: F821
         """Return list of tasks objects."""
@@ -451,8 +423,7 @@ class Workflow(Base):
             # TODO add serialization function
             json.dumps(self.task_relation_json),
             json.dumps(self.task_definition_json),
-            json.dumps(self.schedule_json) if self.schedule_json else None,
-            self.online_schedule,
+            self.timing_name,
             None,
         )
         return self._workflow_code
diff --git a/src/pydolphinscheduler/default_config.yaml 
b/src/pydolphinscheduler/default_config.yaml
index a382413..e75cec3 100644
--- a/src/pydolphinscheduler/default_config.yaml
+++ b/src/pydolphinscheduler/default_config.yaml
@@ -47,13 +47,13 @@ default:
     state: 1
   # Default value for dolphinscheduler's workflow object
   workflow:
-    project: project-pydolphin
+    project: pydolphinscheduler
     user: admin
     queue: queuePythonGateway
     worker_group: default
     # Release state of workflow, default value is ``online`` which mean 
setting workflow online when it submits
     # to Java gateway, if you want to set workflow offline set its value to 
``offline``
-    release_state: online
+    release_state: offline
     time_zone: Asia/Shanghai
     # Warning type of the workflow, default value is ``NONE`` mean do not warn 
user in any cases of workflow state,
     # change to ``FAILURE`` if you want to warn users when workflow failed. 
All available enum value are
diff --git a/src/pydolphinscheduler/examples/task_datax_example.py 
b/src/pydolphinscheduler/examples/task_datax_example.py
index 6fdf779..33511ab 100644
--- a/src/pydolphinscheduler/examples/task_datax_example.py
+++ b/src/pydolphinscheduler/examples/task_datax_example.py
@@ -74,6 +74,7 @@ JSON_TEMPLATE = {
 
 with Workflow(
     name="task_datax_example",
+    release_state="offline",
 ) as workflow:
     # This task synchronizes the data in `t_ds_project`
     # of `first_mysql` database to `target_project` of `second_mysql` database.
@@ -89,6 +90,6 @@ with Workflow(
 
     # You can custom json_template of datax to sync data. This task create a 
new
     # datax job same as task1, transfer record from `first_mysql` to 
`second_mysql`
-    task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE))
+    task2 = CustomDataX(name="task_custom_datax", config=JSON_TEMPLATE)
     workflow.run()
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/tutorial.py 
b/src/pydolphinscheduler/examples/task_python_example.py
similarity index 64%
copy from src/pydolphinscheduler/examples/tutorial.py
copy to src/pydolphinscheduler/examples/task_python_example.py
index 2814caf..681867b 100644
--- a/src/pydolphinscheduler/examples/tutorial.py
+++ b/src/pydolphinscheduler/examples/task_python_example.py
@@ -30,37 +30,74 @@ task_parent -->                        -->  task_union
 it will instantiate and run all the task it have.
 """
 
+import time
+from datetime import datetime
+
 # [start tutorial]
 # [start package_import]
 # Import Workflow object to define your workflow attributes
 from pydolphinscheduler.core.workflow import Workflow
 
 # Import task Shell object cause we would create some shell tasks later
-from pydolphinscheduler.tasks.shell import Shell
+from pydolphinscheduler.tasks.func_wrap import task
+from pydolphinscheduler.tasks.python import Python
 
 # [end package_import]
 
+scope_global = "global-var"
+
+
+# [start task_declare]
+@task
+def print_something():
+    """First task in this workflow."""
+    print("hello python function wrap task")
+
+
+@task
+def depend_import():
+    """Depend on import module."""
+    time.sleep(2)
+
+
+definition_str = """
+from datetime import datetime
+
+def foo():
+    print("hello world in function foo")
+    print(f"Current time is {datetime.now()}")
+"""
+
+
+def bar():
+    print("hello world in function bar")
+    print(f"Current time is {datetime.now()}")
+
+
+# [end task_declare]
+
+
 # [start workflow_declare]
 with Workflow(
-    name="tutorial",
+    name="task_python",
     schedule="0 0 0 * * ? *",
     start_time="2021-01-01",
-    online_schedule=False,
-    release_state="offline",
 ) as workflow:
     # [end workflow_declare]
-    # [start task_declare]
-    task_parent = Shell(name="task_parent", command="echo hello 
pydolphinscheduler")
-    task_child_one = Shell(name="task_child_one", command="echo 'child one'")
-    task_child_two = Shell(name="task_child_two", command="echo 'child two'")
-    task_union = Shell(name="task_union", command="echo union")
-    # [end task_declare]
 
-    # [start task_relation_declare]
-    task_group = [task_child_one, task_child_two]
-    task_parent.set_downstream(task_group)
+    python_str = Python(
+        name="python_str",
+        definition=definition_str,
+    )
 
-    task_union << task_group
+    python_func = Python(
+        name="python_func",
+        definition=bar,
+        datasource_name=["mysql-meta"],
+    )
+
+    # [start task_relation_declare]
+    print_something() >> depend_import() >> python_str >> python_func
     # [end task_relation_declare]
 
     # [start submit_or_run]
diff --git a/src/pydolphinscheduler/examples/tutorial.py 
b/src/pydolphinscheduler/examples/task_shell_example.py
similarity index 65%
copy from src/pydolphinscheduler/examples/tutorial.py
copy to src/pydolphinscheduler/examples/task_shell_example.py
index 2814caf..7eda184 100644
--- a/src/pydolphinscheduler/examples/tutorial.py
+++ b/src/pydolphinscheduler/examples/task_shell_example.py
@@ -38,32 +38,47 @@ from pydolphinscheduler.core.workflow import Workflow
 # Import task Shell object cause we would create some shell tasks later
 from pydolphinscheduler.tasks.shell import Shell
 
-# [end package_import]
-
 # [start workflow_declare]
 with Workflow(
-    name="tutorial",
+    name="task_shell",
     schedule="0 0 0 * * ? *",
     start_time="2021-01-01",
-    online_schedule=False,
-    release_state="offline",
 ) as workflow:
     # [end workflow_declare]
-    # [start task_declare]
-    task_parent = Shell(name="task_parent", command="echo hello 
pydolphinscheduler")
-    task_child_one = Shell(name="task_child_one", command="echo 'child one'")
-    task_child_two = Shell(name="task_child_two", command="echo 'child two'")
-    task_union = Shell(name="task_union", command="echo union")
-    # [end task_declare]
+    simple = Shell(name="simple", command="echo simple")
+
+    datasource_source = Shell(
+        name="datasource_source",
+        command="""
+        echo "${getConnectionHost('mysql-meta')}"
+        echo "${getConnectionUsername('mysql-target')}"
+        """,
+        datasource_name=[
+            "mysql-meta",
+            "mysql-target",
+        ],
+    )
+
+    remote_connection = Shell(
+        name="remote_connection",
+        command="ls /tmp",
+        remote_connection="remote-ssh-ws3",
+    )
 
-    # [start task_relation_declare]
-    task_group = [task_child_one, task_child_two]
-    task_parent.set_downstream(task_group)
+    mixin = Shell(
+        name="mixin",
+        command="""
+            echo "${getConnectionHost('mysql-meta')}"
+            echo "${getConnectionUsername('mysql-target')}"
+            """,
+        remote_connection="remote-ssh-ws3",
+        datasource_name=[
+            "mysql-meta",
+            "mysql-target",
+        ],
+    )
 
-    task_union << task_group
-    # [end task_relation_declare]
+    simple >> datasource_source >> remote_connection >> mixin
 
     # [start submit_or_run]
     workflow.submit()
-    # [end submit_or_run]
-# [end tutorial]
diff --git a/src/pydolphinscheduler/examples/tutorial.py 
b/src/pydolphinscheduler/examples/tutorial.py
index 2814caf..11b3d3d 100644
--- a/src/pydolphinscheduler/examples/tutorial.py
+++ b/src/pydolphinscheduler/examples/tutorial.py
@@ -43,10 +43,8 @@ from pydolphinscheduler.tasks.shell import Shell
 # [start workflow_declare]
 with Workflow(
     name="tutorial",
-    schedule="0 0 0 * * ? *",
     start_time="2021-01-01",
-    online_schedule=False,
-    release_state="offline",
+    release_state="online",
 ) as workflow:
     # [end workflow_declare]
     # [start task_declare]
diff --git a/src/pydolphinscheduler/java_gateway.py 
b/src/pydolphinscheduler/java_gateway.py
index 429f1ac..4b1476e 100644
--- a/src/pydolphinscheduler/java_gateway.py
+++ b/src/pydolphinscheduler/java_gateway.py
@@ -88,7 +88,7 @@ class GatewayEntryPoint:
         """Get the java gateway version, expected to be equal with 
pydolphinscheduler."""
         return self.gateway.entry_point.getGatewayVersion()
 
-    def get_datasource(self, name: str, type: Optional[str] = None):
+    def get_datasource(self, name: str):
         """Get single datasource by java gateway.
 
         Will use datasource_name to query database, and then filter by 
datasource_type if provided.
@@ -96,7 +96,7 @@ class GatewayEntryPoint:
         :param name: datasource name of the datasource to be queried
         :param type: datasource type of the datasource, only used to filter 
the result.
         """
-        return self.gateway.entry_point.getDatasource(name, type)
+        return self.gateway.entry_point.getDatasource(name)
 
     def get_resources_file_info(self, program_type: str, main_package: str):
         """Get resources file info through java gateway."""
@@ -251,8 +251,7 @@ class GatewayEntryPoint:
         release_state: int,
         task_relation_json: str,
         task_definition_json: str,
-        schedule: Optional[str] = None,
-        online_schedule: Optional[bool] = None,
+        timingName: Optional[str] = None,
         other_params_json: Optional[str] = None,
     ):
         """Create or update workflow through java gateway."""
@@ -262,8 +261,7 @@ class GatewayEntryPoint:
             name,
             description,
             global_params,
-            schedule,
-            online_schedule,
+            timingName,
             warning_type,
             warning_group_id,
             timeout,
diff --git a/src/pydolphinscheduler/models/datasource.py 
b/src/pydolphinscheduler/models/datasource.py
index 7c70902..3b55f21 100644
--- a/src/pydolphinscheduler/models/datasource.py
+++ b/src/pydolphinscheduler/models/datasource.py
@@ -20,8 +20,6 @@ import json
 import re
 from typing import NamedTuple, Optional
 
-from py4j.java_gateway import JavaObject
-
 from pydolphinscheduler.java_gateway import gateway
 from pydolphinscheduler.models.connection import Connection
 from pydolphinscheduler.models.meta import ModelMeta
@@ -32,6 +30,7 @@ class TaskUsage(NamedTuple):
 
     id: str
     type: str
+    name: str
 
 
 class Datasource(metaclass=ModelMeta):
@@ -91,52 +90,46 @@ class Datasource(metaclass=ModelMeta):
 
     def __init__(
         self,
-        st_db_type: str,
-        name: str,
-        connection_params: str,
-        user_id: Optional[int] = None,
+        datasource_name: str,
+        plugin_name: str,
+        plugin_version: str,
+        datasource_config: str = None,
         id_: Optional[int] = None,
-        note: Optional[str] = None,
+        description: Optional[str] = None,
     ):
         self.id = id_
-        self.name = name
-        self.note = note
-        # TODO try to handle type_ in metaclass
-        self.st_db_type: JavaObject = st_db_type
-        self.user_id = user_id
-        self.connection_params = connection_params
+        self.datasource_name = datasource_name
+        self.plugin_name = plugin_name
+        self.plugin_version = plugin_version
+        self.datasource_config = datasource_config
+        self.description = description
 
     @classmethod
-    def get(
-        cls, datasource_name: str, datasource_type: Optional[str] = None
-    ) -> "Datasource":
+    def get(cls, datasource_name: str) -> "Datasource":
         """Get single datasource.
 
         :param datasource_name: datasource name
         :param datasource_type: datasource type, if not provided, will get 
datasource by name only
         """
-        datasource = gateway.get_datasource(datasource_name, datasource_type)
+        datasource = gateway.get_datasource(datasource_name)
         if datasource is None:
-            raise ValueError(
-                f"Datasource with name: {datasource_name} and type: 
{datasource_type} not found."
-            )
+            raise ValueError(f"Datasource with name: {datasource_name} not 
found.")
         return datasource
 
     @classmethod
-    def get_task_usage_4j(
-        cls, datasource_name: str, datasource_type: Optional[str] = None
-    ) -> TaskUsage:
+    def get_task_usage_4j(cls, datasource_name: str) -> TaskUsage:
         """Get the necessary information of datasource for task usage in web 
UI."""
-        datasource: "Datasource" = cls.get(datasource_name, datasource_type)
+        datasource: "Datasource" = cls.get(datasource_name)
         return TaskUsage(
             id=str(datasource.id),
-            type=datasource.type,
+            type=datasource.plugin_name,
+            name=datasource.datasource_name,
         )
 
     @property
     def connection(self) -> Connection:
-        """Parse dolphinscheduler connection_params to Connection."""
-        data = json.loads(self.connection_params)
+        """Parse dolphinscheduler datasource_config to Connection."""
+        data = json.loads(self.datasource_config)
 
         pattern_match = self._PATTERN.match(
             data.get("jdbcUrl", data.get("url", None))
@@ -150,16 +143,6 @@ class Datasource(metaclass=ModelMeta):
             password=data.get("password", None),
         )
 
-    @property
-    def type(self) -> str:
-        """Property datasource type."""
-        return self.st_db_type.getDescp()
-
-    @property
-    def type_code(self) -> str:
-        """Property datasource type."""
-        return self.st_db_type.getCode()
-
     @property
     def host(self) -> str:
         """Property datasource host, such as ``127.0.0.1`` or 
``localhosts``."""
diff --git a/src/pydolphinscheduler/tasks/datax.py 
b/src/pydolphinscheduler/tasks/datax.py
index 1dfa89c..1cb438c 100644
--- a/src/pydolphinscheduler/tasks/datax.py
+++ b/src/pydolphinscheduler/tasks/datax.py
@@ -16,7 +16,7 @@
 # under the License.
 
 """Task datax."""
-
+import json as json_pkg
 from typing import Dict, List, Optional
 
 from pydolphinscheduler.constants import TaskType
@@ -40,13 +40,13 @@ class CustomDataX(Task):
     def __init__(
         self,
         name: str,
-        json: str,
+        config: dict,
         xms: Optional[int] = 1,
         xmx: Optional[int] = 1,
         *args,
         **kwargs
     ):
-        self._json = json
+        self._json = json_pkg.dumps(config, indent=2)
         super().__init__(name, TaskType.DATAX, *args, **kwargs)
         self.custom_config = self.CUSTOM_CONFIG
         self.xms = xms
@@ -98,8 +98,6 @@ class DataX(Task):
         datatarget_name: str,
         sql: str,
         target_table: str,
-        datasource_type: Optional[str] = None,
-        datatarget_type: Optional[str] = None,
         job_speed_byte: Optional[int] = 0,
         job_speed_record: Optional[int] = 1000,
         pre_statements: Optional[List[str]] = None,
@@ -112,9 +110,7 @@ class DataX(Task):
         self._sql = sql
         super().__init__(name, TaskType.DATAX, *args, **kwargs)
         self.custom_config = self.CUSTOM_CONFIG
-        self.datasource_type = datasource_type
         self.datasource_name = datasource_name
-        self.datatarget_type = datatarget_type
         self.datatarget_name = datatarget_name
         self.target_table = target_table
         self.job_speed_byte = job_speed_byte
@@ -127,9 +123,7 @@ class DataX(Task):
     @property
     def source_params(self) -> Dict:
         """Get source params for datax task."""
-        datasource_task_u = Datasource.get_task_usage_4j(
-            self.datasource_name, self.datasource_type
-        )
+        datasource_task_u = Datasource.get_task_usage_4j(self.datasource_name)
         return {
             "dsType": datasource_task_u.type,
             "dataSource": datasource_task_u.id,
@@ -138,9 +132,7 @@ class DataX(Task):
     @property
     def target_params(self) -> Dict:
         """Get target params for datax task."""
-        datasource_task_u = Datasource.get_task_usage_4j(
-            self.datatarget_name, self.datatarget_type
-        )
+        datasource_task_u = Datasource.get_task_usage_4j(self.datatarget_name)
         return {
             "dtType": datasource_task_u.type,
             "dataTarget": datasource_task_u.id,
diff --git a/src/pydolphinscheduler/tasks/mixin/__init__.py 
b/src/pydolphinscheduler/tasks/mixin/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/pydolphinscheduler/tasks/mixin/datasource_list_mixin.py 
b/src/pydolphinscheduler/tasks/mixin/datasource_list_mixin.py
new file mode 100644
index 0000000..d94c0f3
--- /dev/null
+++ b/src/pydolphinscheduler/tasks/mixin/datasource_list_mixin.py
@@ -0,0 +1,28 @@
+from typing import Dict, List, NamedTuple, Optional
+
+from pydolphinscheduler.models.datasource import Datasource
+
+
+class DatasourceType(NamedTuple):
+    type: str
+    datasource: str
+
+
+class DatasourceList(NamedTuple):
+    dataSourceList: List[Dict]
+
+
+class DatasourceListMixin:
+    datasource_name: Optional[List[str]]
+
+    def get_datasource(self) -> Dict:
+        datasource_list_detail = []
+        for name in self.datasource_name:
+            datasource_task_u = Datasource.get_task_usage_4j(name)
+            datasource_itme = DatasourceType(
+                type=datasource_task_u.type, datasource=datasource_task_u.id
+            )
+            datasource_list_detail.append(datasource_itme._asdict())
+
+        datasource_list = DatasourceList(dataSourceList=datasource_list_detail)
+        return datasource_list._asdict()
diff --git a/src/pydolphinscheduler/tasks/mixin/remote_connection_mixin.py 
b/src/pydolphinscheduler/tasks/mixin/remote_connection_mixin.py
new file mode 100644
index 0000000..3890a47
--- /dev/null
+++ b/src/pydolphinscheduler/tasks/mixin/remote_connection_mixin.py
@@ -0,0 +1,29 @@
+from typing import Dict, NamedTuple, Optional
+
+from pydolphinscheduler.models.datasource import Datasource
+
+
+class RemoteConnectionType(NamedTuple):
+    open: bool
+    datasourceType: str
+    datasourceId: str
+
+
+class RemoteConnectionList(NamedTuple):
+    remoteConnection: Dict
+
+
+class RemoteConnectionMixin:
+    remote_connection: Optional[str]
+
+    def get_remote_connection(self) -> Dict:
+        if self.remote_connection is not None:
+            connection = Datasource.get_task_usage_4j(self.remote_connection)
+            connection_itme = RemoteConnectionType(
+                open=True, datasourceType=connection.type, 
datasourceId=connection.id
+            )
+
+            datasource_list = RemoteConnectionList(
+                remoteConnection=connection_itme._asdict()
+            )
+            return datasource_list._asdict()
diff --git a/src/pydolphinscheduler/tasks/procedure.py 
b/src/pydolphinscheduler/tasks/procedure.py
index 5a9d87b..7620f4d 100644
--- a/src/pydolphinscheduler/tasks/procedure.py
+++ b/src/pydolphinscheduler/tasks/procedure.py
@@ -17,7 +17,7 @@
 
 """Task procedure."""
 
-from typing import Dict, Optional
+from typing import Dict
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
@@ -42,26 +42,15 @@ class Procedure(Task):
 
     _task_custom_attr = {"method"}
 
-    def __init__(
-        self,
-        name: str,
-        datasource_name: str,
-        method: str,
-        datasource_type: Optional[str] = None,
-        *args,
-        **kwargs
-    ):
+    def __init__(self, name: str, datasource_name: str, method: str, *args, 
**kwargs):
         super().__init__(name, TaskType.PROCEDURE, *args, **kwargs)
         self.datasource_name = datasource_name
-        self.datasource_type = datasource_type
         self.method = method
 
     @property
     def datasource(self) -> Dict:
         """Get datasource for procedure task."""
-        datasource_task_u = Datasource.get_task_usage_4j(
-            self.datasource_name, self.datasource_type
-        )
+        datasource_task_u = Datasource.get_task_usage_4j(self.datasource_name)
         return {
             "datasource": datasource_task_u.id,
             "type": datasource_task_u.type,
diff --git a/src/pydolphinscheduler/tasks/python.py 
b/src/pydolphinscheduler/tasks/python.py
index c1b2558..61c31f0 100644
--- a/src/pydolphinscheduler/tasks/python.py
+++ b/src/pydolphinscheduler/tasks/python.py
@@ -21,18 +21,19 @@ import logging
 import re
 import types
 from pathlib import Path
-from typing import Union
+from typing import Dict, List, Optional, Union
 
 from stmdency.extractor import Extractor
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.tasks.mixin.datasource_list_mixin import 
DatasourceListMixin
 
 log = logging.getLogger(__file__)
 
 
-class Python(Task):
+class Python(Task, DatasourceListMixin):
     """Task Python object, declare behavior for Python task to 
dolphinscheduler.
 
     Python task support two types of parameters for :param:``definition``, and 
here is an example:
@@ -57,15 +58,23 @@ class Python(Task):
         want to execute.
     """
 
-    _task_custom_attr = {"raw_script"}
+    _task_custom_attr = {
+        "raw_script",
+    }
 
     ext: set = {".py"}
     ext_attr: Union[str, types.FunctionType] = "_definition"
 
     def __init__(
-        self, name: str, definition: Union[str, types.FunctionType], *args, 
**kwargs
+        self,
+        name: str,
+        definition: Union[str, types.FunctionType],
+        datasource_name: Optional[List[str]] = None,
+        *args,
+        **kwargs,
     ):
         self._definition = definition
+        self.datasource_name = datasource_name
         super().__init__(name, TaskType.PYTHON, *args, **kwargs)
 
     def _build_exe_str(self) -> str:
@@ -103,3 +112,15 @@ class Python(Task):
                 "Parameter definition do not support % for now.",
                 type(getattr(self, "definition")),
             )
+
+    @property
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
Dict:
+        """Override Task.task_params for sql task.
+
+        sql task have some specials attribute for task_params, and is odd if we
+        directly set as python property, so we Override Task.task_params here.
+        """
+        params = super().task_params
+        if self.datasource_name:
+            params.update(self.get_datasource())
+        return params
diff --git a/src/pydolphinscheduler/tasks/shell.py 
b/src/pydolphinscheduler/tasks/shell.py
index 36ec4e8..110cbe8 100644
--- a/src/pydolphinscheduler/tasks/shell.py
+++ b/src/pydolphinscheduler/tasks/shell.py
@@ -16,12 +16,15 @@
 # under the License.
 
 """Task shell."""
+from typing import Dict, List, Optional
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.tasks.mixin.datasource_list_mixin import 
DatasourceListMixin
+from pydolphinscheduler.tasks.mixin.remote_connection_mixin import 
RemoteConnectionMixin
 
 
-class Shell(Task):
+class Shell(Task, DatasourceListMixin, RemoteConnectionMixin):
     """Task shell object, declare behavior for shell task to dolphinscheduler.
 
     :param name: A unique, meaningful string for the shell task.
@@ -53,6 +56,30 @@ class Shell(Task):
     ext: set = {".sh", ".zsh"}
     ext_attr: str = "_raw_script"
 
-    def __init__(self, name: str, command: str, *args, **kwargs):
+    def __init__(
+        self,
+        name: str,
+        command: str,
+        datasource_name: Optional[List[str]] = None,
+        remote_connection: Optional[str] = None,
+        *args,
+        **kwargs
+    ):
         self._raw_script = command
+        self.datasource_name = datasource_name
+        self.remote_connection = remote_connection
         super().__init__(name, TaskType.SHELL, *args, **kwargs)
+
+    @property
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
Dict:
+        """Override Task.task_params for sql task.
+
+        sql task have some specials attribute for task_params, and is odd if we
+        directly set as python property, so we Override Task.task_params here.
+        """
+        params = super().task_params
+        if self.datasource_name:
+            params.update(self.get_datasource())
+        if self.remote_connection:
+            params.update(self.get_remote_connection())
+        return params
diff --git a/src/pydolphinscheduler/tasks/sql.py 
b/src/pydolphinscheduler/tasks/sql.py
index a320d43..3d7f4d8 100644
--- a/src/pydolphinscheduler/tasks/sql.py
+++ b/src/pydolphinscheduler/tasks/sql.py
@@ -79,7 +79,6 @@ class Sql(Task):
         name: str,
         datasource_name: str,
         sql: str,
-        datasource_type: Optional[str] = None,
         sql_type: Optional[str] = None,
         pre_statements: Union[str, Sequence[str], None] = None,
         post_statements: Union[str, Sequence[str], None] = None,
@@ -91,7 +90,6 @@ class Sql(Task):
         super().__init__(name, TaskType.SQL, *args, **kwargs)
         self.param_sql_type = sql_type
         self.datasource_name = datasource_name
-        self.datasource_type = datasource_type
         self.pre_statements = self.get_stm_list(pre_statements)
         self.post_statements = self.get_stm_list(post_statements)
         self.display_rows = display_rows
@@ -139,9 +137,7 @@ class Sql(Task):
     @property
     def datasource(self) -> Dict:
         """Get datasource for procedure sql."""
-        datasource_task_u = Datasource.get_task_usage_4j(
-            self.datasource_name, self.datasource_type
-        )
+        datasource_task_u = Datasource.get_task_usage_4j(self.datasource_name)
         return {
             "datasource": datasource_task_u.id,
             "type": datasource_task_u.type,

Reply via email to