This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
The following commit(s) were added to refs/heads/main by this push:
new e1d1527 feat: Apply PEP-563 for codebase (#128)
e1d1527 is described below
commit e1d15274a6ffc2cf2ee3236d3ba3e8a0b1cf70d3
Author: Jay Chung <[email protected]>
AuthorDate: Mon Dec 25 18:35:00 2023 +0800
feat: Apply PEP-563 for codebase (#128)
---
.ruff.toml | 2 +
setup.py | 5 +-
src/pydolphinscheduler/core/engine.py | 8 +-
src/pydolphinscheduler/core/resource.py | 8 +-
src/pydolphinscheduler/core/task.py | 97 +++++++++++-----------
src/pydolphinscheduler/core/workflow.py | 70 ++++++++--------
src/pydolphinscheduler/core/yaml_workflow.py | 6 +-
src/pydolphinscheduler/java_gateway.py | 29 +++----
src/pydolphinscheduler/models/base.py | 10 +--
src/pydolphinscheduler/models/base_side.py | 4 +-
src/pydolphinscheduler/models/datasource.py | 21 +++--
src/pydolphinscheduler/models/meta.py | 6 +-
src/pydolphinscheduler/models/project.py | 8 +-
src/pydolphinscheduler/models/queue.py | 4 +-
src/pydolphinscheduler/models/tenant.py | 12 +--
src/pydolphinscheduler/models/user.py | 18 ++--
src/pydolphinscheduler/models/worker_group.py | 4 +-
.../resources_plugin/base/bucket.py | 23 ++---
.../resources_plugin/base/git.py | 38 +++++----
src/pydolphinscheduler/resources_plugin/github.py | 9 +-
src/pydolphinscheduler/resources_plugin/gitlab.py | 11 +--
src/pydolphinscheduler/resources_plugin/oss.py | 9 +-
src/pydolphinscheduler/resources_plugin/s3.py | 11 +--
src/pydolphinscheduler/tasks/condition.py | 10 +--
src/pydolphinscheduler/tasks/datax.py | 32 +++----
src/pydolphinscheduler/tasks/dependent.py | 16 ++--
src/pydolphinscheduler/tasks/dvc.py | 9 +-
src/pydolphinscheduler/tasks/flink.py | 28 +++----
src/pydolphinscheduler/tasks/http.py | 16 ++--
src/pydolphinscheduler/tasks/map_reduce.py | 12 +--
src/pydolphinscheduler/tasks/mlflow.py | 49 +++++------
src/pydolphinscheduler/tasks/procedure.py | 10 +--
src/pydolphinscheduler/tasks/python.py | 7 +-
src/pydolphinscheduler/tasks/pytorch.py | 14 ++--
src/pydolphinscheduler/tasks/spark.py | 26 +++---
src/pydolphinscheduler/tasks/sql.py | 21 ++---
src/pydolphinscheduler/tasks/sub_workflow.py | 4 +-
src/pydolphinscheduler/tasks/switch.py | 13 ++-
src/pydolphinscheduler/utils/file.py | 7 +-
src/pydolphinscheduler/utils/yaml_parser.py | 9 +-
tests/core/test_task.py | 7 +-
tests/core/test_workflow.py | 8 +-
tests/integration/test_process_definition.py | 5 +-
tests/tasks/test_condition.py | 11 +--
tests/tasks/test_dependent.py | 23 ++---
tests/tasks/test_switch.py | 6 +-
tests/testing/docker_wrapper.py | 4 +-
tests/testing/file.py | 6 +-
tests/testing/path.py | 4 +-
tests/utils/test_date.py | 4 +-
tests/utils/test_yaml_parser.py | 13 ++-
51 files changed, 404 insertions(+), 383 deletions(-)
diff --git a/.ruff.toml b/.ruff.toml
index 4e42e71..7ddd14b 100644
--- a/.ruff.toml
+++ b/.ruff.toml
@@ -1,5 +1,7 @@
src = ["src"]
+target-version = "py38"
+
# max-line-length = 110
line-length = 110
diff --git a/setup.py b/setup.py
index 7a75bc8..a56b672 100644
--- a/setup.py
+++ b/setup.py
@@ -16,13 +16,14 @@
# under the License.
"""The script for setting up pydolphinscheduler."""
+from __future__ import annotations
+
import logging
import os
import sys
from distutils.command.sdist import sdist
from distutils.dir_util import remove_tree
from distutils.errors import DistutilsExecError
-from typing import List
from setuptools import Command, setup
@@ -38,7 +39,7 @@ class CleanCommand(Command):
"""Command to clean up python api before setup by running `python setup.py
clean`."""
description = "Clean up project root"
- user_options: List[str] = []
+ user_options: list[str] = []
clean_list = [
"build",
"htmlcov",
diff --git a/src/pydolphinscheduler/core/engine.py
b/src/pydolphinscheduler/core/engine.py
index 8cf8469..3e36820 100644
--- a/src/pydolphinscheduler/core/engine.py
+++ b/src/pydolphinscheduler/core/engine.py
@@ -17,7 +17,7 @@
"""Module engine."""
-from typing import Dict, Optional
+from __future__ import annotations
from py4j.protocol import Py4JJavaError
@@ -47,9 +47,9 @@ class Engine(Task):
task_type: str,
main_class: str,
main_package: str,
- program_type: Optional[ProgramType] = ProgramType.SCALA,
+ program_type: ProgramType | None = ProgramType.SCALA,
*args,
- **kwargs
+ **kwargs,
):
super().__init__(name, task_type, *args, **kwargs)
self.main_class = main_class
@@ -76,7 +76,7 @@ class Engine(Task):
return self.get_resource_info(self.program_type,
self.main_package).get("id")
@property
- def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
Dict:
+ def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
dict:
"""Override Task.task_params for engine children task.
children task have some specials attribute for task_params, and is odd
if we
diff --git a/src/pydolphinscheduler/core/resource.py
b/src/pydolphinscheduler/core/resource.py
index 907114f..09b0b88 100644
--- a/src/pydolphinscheduler/core/resource.py
+++ b/src/pydolphinscheduler/core/resource.py
@@ -17,7 +17,7 @@
"""Module resource."""
-from typing import Optional
+from __future__ import annotations
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.java_gateway import gateway
@@ -38,9 +38,9 @@ class Resource(Base):
def __init__(
self,
name: str,
- content: Optional[str] = None,
- description: Optional[str] = None,
- user_name: Optional[str] = None,
+ content: str | None = None,
+ description: str | None = None,
+ user_name: str | None = None,
):
super().__init__(name, description)
self.content = content
diff --git a/src/pydolphinscheduler/core/task.py
b/src/pydolphinscheduler/core/task.py
index 5f9fccb..ac48ab8 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -16,12 +16,15 @@
# under the License.
"""DolphinScheduler Task and TaskRelation object."""
+
+from __future__ import annotations
+
import copy
import types
import warnings
+from collections.abc import Sequence
from datetime import timedelta
from logging import getLogger
-from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
from pydolphinscheduler import configuration
from pydolphinscheduler.constants import (
@@ -72,7 +75,7 @@ class TaskRelation(Base):
self,
pre_task_code: int,
post_task_code: int,
- name: Optional[str] = None,
+ name: str | None = None,
):
super().__init__(name)
self.pre_task_code = pre_task_code
@@ -147,7 +150,7 @@ class Task(Base):
_task_custom_attr: set = set()
ext: set = None
- ext_attr: Union[str, types.FunctionType] = None
+ ext_attr: str | types.FunctionType = None
DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
@@ -155,27 +158,27 @@ class Task(Base):
self,
name: str,
task_type: str,
- description: Optional[str] = None,
- flag: Optional[str] = TaskFlag.YES,
- task_priority: Optional[str] = TaskPriority.MEDIUM,
- worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
- environment_name: Optional[str] = None,
- task_group_id: Optional[int] = 0,
- task_group_priority: Optional[int] = 0,
- delay_time: Optional[int] = 0,
- fail_retry_times: Optional[int] = 0,
- fail_retry_interval: Optional[int] = 1,
- timeout_notify_strategy: Optional = None,
- timeout: Optional[Union[timedelta, int]] = None,
- workflow: Optional[Workflow] = None,
- resource_list: Optional[List] = None,
- dependence: Optional[Dict] = None,
- wait_start_timeout: Optional[Dict] = None,
- condition_result: Optional[Dict] = None,
- resource_plugin: Optional[ResourcePlugin] = None,
- is_cache: Optional[bool] = False,
- input_params: Optional[Dict] = None,
- output_params: Optional[Dict] = None,
+ description: str | None = None,
+ flag: str | None = TaskFlag.YES,
+ task_priority: str | None = TaskPriority.MEDIUM,
+ worker_group: str | None = configuration.WORKFLOW_WORKER_GROUP,
+ environment_name: str | None = None,
+ task_group_id: int | None = 0,
+ task_group_priority: int | None = 0,
+ delay_time: int | None = 0,
+ fail_retry_times: int | None = 0,
+ fail_retry_interval: int | None = 1,
+ timeout_notify_strategy: str | None = None,
+ timeout: timedelta | int | None = None,
+ workflow: Workflow | None = None,
+ resource_list: list | None = None,
+ dependence: dict | None = None,
+ wait_start_timeout: dict | None = None,
+ condition_result: dict | None = None,
+ resource_plugin: ResourcePlugin | None = None,
+ is_cache: bool | None = False,
+ input_params: dict | None = None,
+ output_params: dict | None = None,
*args,
**kwargs,
):
@@ -192,7 +195,7 @@ class Task(Base):
self.fail_retry_interval = fail_retry_interval
self.delay_time = delay_time
self.timeout_notify_strategy = timeout_notify_strategy
- self._timeout: Union[timedelta, int] = timeout
+ self._timeout: timedelta | int = timeout
self._workflow = None
self._input_params = input_params or {}
self._output_params = output_params or {}
@@ -214,9 +217,9 @@ class Task(Base):
)
self._local_params = kwargs.get("local_params")
- self._upstream_task_codes: Set[int] = set()
- self._downstream_task_codes: Set[int] = set()
- self._task_relation: Set[TaskRelation] = set()
+ self._upstream_task_codes: set[int] = set()
+ self._downstream_task_codes: set[int] = set()
+ self._task_relation: set[TaskRelation] = set()
# move attribute code and version after _workflow and workflow declare
self.code, self.version = self.gen_code_and_version()
# Add task to workflow, maybe we could put into property workflow
latter
@@ -238,12 +241,12 @@ class Task(Base):
self.get_content()
@property
- def workflow(self) -> Optional[Workflow]:
+ def workflow(self) -> Workflow | None:
"""Get attribute workflow."""
return self._workflow
@workflow.setter
- def workflow(self, workflow: Optional[Workflow]):
+ def workflow(self, workflow: Workflow | None):
"""Set attribute workflow."""
self._workflow = workflow
@@ -270,7 +273,7 @@ class Task(Base):
raise PyDSParamException("is_cache must be a bool")
@property
- def resource_list(self) -> List[Dict[str, Resource]]:
+ def resource_list(self) -> list[dict[str, Resource]]:
"""Get task define attribute `resource_list`."""
resources = set()
for res in self._resource_list:
@@ -292,7 +295,7 @@ class Task(Base):
return [{ResourceKey.NAME: r} for r in resources]
@property
- def user_name(self) -> Optional[str]:
+ def user_name(self) -> str | None:
"""Return username of workflow."""
if self.workflow:
return self.workflow.user.name
@@ -300,16 +303,16 @@ class Task(Base):
raise PyDSParamException("`user_name` cannot be empty.")
@property
- def condition_result(self) -> Dict:
+ def condition_result(self) -> dict:
"""Get attribute condition_result."""
return self._condition_result
@condition_result.setter
- def condition_result(self, condition_result: Optional[Dict]):
+ def condition_result(self, condition_result: dict | None):
"""Set attribute condition_result."""
self._condition_result = condition_result
- def _get_attr(self) -> Set[str]:
+ def _get_attr(self) -> set[str]:
"""Get final task task_params attribute.
Base on `_task_default_attr`, append attribute from
`_task_custom_attr` and subtract attribute from
@@ -321,7 +324,7 @@ class Task(Base):
return attr
@property
- def task_params(self) -> Optional[Dict]:
+ def task_params(self) -> dict | None:
"""Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
@@ -373,29 +376,27 @@ class Task(Base):
def __hash__(self):
return hash(self.code)
- def __lshift__(self, other: Union["Task", Sequence["Task"]]):
+ def __lshift__(self, other: Task | Sequence[Task]):
"""Implement Task << Task."""
self.set_upstream(other)
return other
- def __rshift__(self, other: Union["Task", Sequence["Task"]]):
+ def __rshift__(self, other: Task | Sequence[Task]):
"""Implement Task >> Task."""
self.set_downstream(other)
return other
- def __rrshift__(self, other: Union["Task", Sequence["Task"]]):
+ def __rrshift__(self, other: Task | Sequence[Task]):
"""Call for Task >> [Task] because list don't have __rshift__
operators."""
self.__lshift__(other)
return self
- def __rlshift__(self, other: Union["Task", Sequence["Task"]]):
+ def __rlshift__(self, other: Task | Sequence[Task]):
"""Call for Task << [Task] because list don't have __lshift__
operators."""
self.__rshift__(other)
return self
- def _set_deps(
- self, tasks: Union["Task", Sequence["Task"]], upstream: bool = True
- ) -> None:
+ def _set_deps(self, tasks: Task | Sequence[Task], upstream: bool = True)
-> None:
"""Set parameter tasks dependent to current task.
it is a wrapper for :func:`set_upstream` and :func:`set_downstream`.
@@ -427,16 +428,16 @@ class Task(Base):
)
self.workflow._task_relations.add(task_relation)
- def set_upstream(self, tasks: Union["Task", Sequence["Task"]]) -> None:
+ def set_upstream(self, tasks: Task | Sequence[Task]) -> None:
"""Set parameter tasks as upstream to current task."""
self._set_deps(tasks, upstream=True)
- def set_downstream(self, tasks: Union["Task", Sequence["Task"]]) -> None:
+ def set_downstream(self, tasks: Task | Sequence[Task]) -> None:
"""Set parameter tasks as downstream to current task."""
self._set_deps(tasks, upstream=False)
# TODO code should better generate in bulk mode when :ref: workflow run
submit or start
- def gen_code_and_version(self) -> Tuple:
+ def gen_code_and_version(self) -> tuple:
"""Generate task code and version from java gateway.
If task name do not exists in workflow before, if will generate new
code and version id
@@ -474,7 +475,7 @@ class Task(Base):
def add_in(
self,
name: str,
- value: Optional[Union[int, str, float, bool, BaseDataType]] = None,
+ value: int | str | float | bool | BaseDataType | None = None,
):
"""Add input parameters.
@@ -494,7 +495,7 @@ class Task(Base):
def add_out(
self,
name: str,
- value: Optional[Union[int, str, float, bool, BaseDataType]] = None,
+ value: int | str | float | bool | BaseDataType | None = None,
):
"""Add output parameters.
diff --git a/src/pydolphinscheduler/core/workflow.py
b/src/pydolphinscheduler/core/workflow.py
index 99d14ac..7fac15b 100644
--- a/src/pydolphinscheduler/core/workflow.py
+++ b/src/pydolphinscheduler/core/workflow.py
@@ -17,9 +17,11 @@
"""Module workflow, core class for workflow define."""
+from __future__ import annotations
+
import json
from datetime import datetime, timedelta
-from typing import Any, Dict, List, Optional, Set, Union
+from typing import Any
from pydolphinscheduler import configuration
from pydolphinscheduler.constants import Symbol, TaskType
@@ -39,15 +41,15 @@ from pydolphinscheduler.utils.date import (
class WorkflowContext:
"""Class workflow context, use when task get workflow from context
expression."""
- _context_managed_workflow: Optional["Workflow"] = None
+ _context_managed_workflow: Workflow | None = None
@classmethod
- def set(cls, workflow: "Workflow") -> None:
+ def set(cls, workflow: Workflow) -> None:
"""Set attribute self._context_managed_workflow."""
cls._context_managed_workflow = workflow
@classmethod
- def get(cls) -> Optional["Workflow"]:
+ def get(cls) -> Workflow | None:
"""Get attribute self._context_managed_workflow."""
return cls._context_managed_workflow
@@ -125,23 +127,23 @@ class Workflow(Base):
def __init__(
self,
name: str,
- description: Optional[str] = None,
- schedule: Optional[str] = None,
- online_schedule: Optional[bool] = None,
- start_time: Optional[Union[str, datetime]] = None,
- end_time: Optional[Union[str, datetime]] = None,
- timezone: Optional[str] = configuration.WORKFLOW_TIME_ZONE,
- user: Optional[str] = configuration.WORKFLOW_USER,
- project: Optional[str] = configuration.WORKFLOW_PROJECT,
- worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
- warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE,
- warning_group_id: Optional[int] = 0,
- execution_type: Optional[str] = configuration.WORKFLOW_EXECUTION_TYPE,
- timeout: Optional[Union[timedelta, int]] = 0,
- release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
- param: Optional[Dict] = None,
- resource_plugin: Optional[ResourcePlugin] = None,
- resource_list: Optional[List[Resource]] = None,
+ description: str | None = None,
+ schedule: str | None = None,
+ online_schedule: bool | None = None,
+ start_time: str | datetime | None = None,
+ end_time: str | datetime | None = None,
+ timezone: str | None = configuration.WORKFLOW_TIME_ZONE,
+ user: str | None = configuration.WORKFLOW_USER,
+ project: str | None = configuration.WORKFLOW_PROJECT,
+ worker_group: str | None = configuration.WORKFLOW_WORKER_GROUP,
+ warning_type: str | None = configuration.WORKFLOW_WARNING_TYPE,
+ warning_group_id: int | None = 0,
+ execution_type: str | None = configuration.WORKFLOW_EXECUTION_TYPE,
+ timeout: timedelta | int | None = 0,
+ release_state: str | None = configuration.WORKFLOW_RELEASE_STATE,
+ param: dict | None = None,
+ resource_plugin: ResourcePlugin | None = None,
+ resource_list: list[Resource] | None = None,
*args,
**kwargs,
):
@@ -187,17 +189,17 @@ class Workflow(Base):
)
else:
self._execution_type = execution_type
- self._timeout: Union[timedelta, int] = timeout
+ self._timeout: timedelta | int = timeout
self._release_state = release_state
self.param = param
self.tasks: dict = {}
self.resource_plugin = resource_plugin
# TODO how to fix circle import
- self._task_relations: set["TaskRelation"] = set() # noqa: F821
+ self._task_relations: set[TaskRelation] = set() # noqa: F821
self._workflow_code = None
self.resource_list = resource_list or []
- def __enter__(self) -> "Workflow":
+ def __enter__(self) -> Workflow:
WorkflowContext.set(self)
return self
@@ -290,7 +292,7 @@ class Workflow(Base):
self._execution_type = val
@property
- def param_json(self) -> Optional[List[Dict]]:
+ def param_json(self) -> list[dict] | None:
"""Return param json base on self.param."""
# Handle empty dict and None value
if not self.param:
@@ -306,7 +308,7 @@ class Workflow(Base):
]
@property
- def task_definition_json(self) -> List[Dict]:
+ def task_definition_json(self) -> list[dict]:
"""Return all tasks definition in list of dict."""
if not self.tasks:
return [self.tasks]
@@ -314,7 +316,7 @@ class Workflow(Base):
return [task.get_define() for task in self.tasks.values()]
@property
- def task_relation_json(self) -> List[Dict]:
+ def task_relation_json(self) -> list[dict]:
"""Return all relation between tasks pair in list of dict."""
if not self.tasks:
return [self.tasks]
@@ -323,7 +325,7 @@ class Workflow(Base):
return [tr.get_define() for tr in self._task_relations]
@property
- def schedule_json(self) -> Optional[Dict]:
+ def schedule_json(self) -> dict | None:
"""Get schedule parameter json object. This is requests from java
gateway interface."""
if not self.schedule:
return None
@@ -342,7 +344,7 @@ class Workflow(Base):
}
@property
- def task_list(self) -> List["Task"]: # noqa: F821
+ def task_list(self) -> list[Task]: # noqa: F821
"""Return list of tasks objects."""
return list(self.tasks.values())
@@ -362,17 +364,17 @@ class Workflow(Base):
root_relation = TaskRelation(pre_task_code=0,
post_task_code=task.code)
self._task_relations.add(root_relation)
- def add_task(self, task: "Task") -> None: # noqa: F821
+ def add_task(self, task: Task) -> None: # noqa: F821
"""Add a single task to workflow."""
self.tasks[task.code] = task
task._workflow = self
- def add_tasks(self, tasks: List["Task"]) -> None: # noqa: F821
+ def add_tasks(self, tasks: list[Task]) -> None: # noqa: F821
"""Add task sequence to workflow, it a wrapper of :func:`add_task`."""
for task in tasks:
self.add_task(task)
- def get_task(self, code: str) -> "Task": # noqa: F821
+ def get_task(self, code: str) -> Task: # noqa: F821
"""Get task object from workflow by given code."""
if code not in self.tasks:
raise PyDSTaskNoFoundException(
@@ -382,7 +384,7 @@ class Workflow(Base):
return self.tasks[code]
# TODO which tying should return in this case
- def get_tasks_by_name(self, name: str) -> Set["Task"]: # noqa: F821
+ def get_tasks_by_name(self, name: str) -> set[Task]: # noqa: F821
"""Get tasks object by given name, if will return all tasks with this
name."""
find = set()
for task in self.tasks.values():
@@ -390,7 +392,7 @@ class Workflow(Base):
find.add(task)
return find
- def get_one_task_by_name(self, name: str) -> "Task": # noqa: F821
+ def get_one_task_by_name(self, name: str) -> Task: # noqa: F821
"""Get exact one task from workflow by given name.
Function always return one task even though this workflow have more
than one task with
diff --git a/src/pydolphinscheduler/core/yaml_workflow.py
b/src/pydolphinscheduler/core/yaml_workflow.py
index df8d65e..a445534 100644
--- a/src/pydolphinscheduler/core/yaml_workflow.py
+++ b/src/pydolphinscheduler/core/yaml_workflow.py
@@ -17,11 +17,13 @@
"""Parse YAML file to create workflow."""
+from __future__ import annotations
+
import logging
import os
import re
from pathlib import Path
-from typing import Any, Dict
+from typing import Any
from pydolphinscheduler import configuration, tasks
from pydolphinscheduler.constants import Symbol
@@ -250,7 +252,7 @@ class YamlWorkflow(YamlParser):
return content
- def parse_task(self, task_data: dict, name2task: Dict[str, Task]):
+ def parse_task(self, task_data: dict, name2task: dict[str, Task]):
"""Parse various types of tasks.
:param task_data: dict.
diff --git a/src/pydolphinscheduler/java_gateway.py
b/src/pydolphinscheduler/java_gateway.py
index 91243f6..43e3445 100644
--- a/src/pydolphinscheduler/java_gateway.py
+++ b/src/pydolphinscheduler/java_gateway.py
@@ -16,11 +16,12 @@
# under the License.
"""Module java gateway, contain gateway behavior."""
+from __future__ import annotations
import contextlib
import warnings
from logging import getLogger
-from typing import Any, Optional
+from typing import Any
from py4j.java_collections import JavaMap
from py4j.java_gateway import GatewayParameters, JavaGateway
@@ -36,7 +37,7 @@ logger = getLogger(__name__)
def gateway_result_checker(
result: JavaMap,
- msg_check: Optional[str] = JavaGatewayDefault.RESULT_MESSAGE_SUCCESS,
+ msg_check: str | None = JavaGatewayDefault.RESULT_MESSAGE_SUCCESS,
) -> Any:
"""Check weather java gateway result success or not."""
if (
@@ -57,10 +58,10 @@ class GatewayEntryPoint:
def __init__(
self,
- address: Optional[str] = None,
- port: Optional[int] = None,
- auto_convert: Optional[bool] = True,
- auth_token: Optional[str] = None,
+ address: str | None = None,
+ port: int | None = None,
+ auto_convert: bool | None = True,
+ auth_token: str | None = None,
):
self._gateway = None
self.address = address or configuration.JAVA_GATEWAY_ADDRESS
@@ -110,7 +111,7 @@ class GatewayEntryPoint:
"""Get the java gateway version, expected to be equal with
pydolphinscheduler."""
return self.gateway.entry_point.getGatewayVersion()
- def get_datasource(self, name: str, type: Optional[str] = None):
+ def get_datasource(self, name: str, type: str | None = None):
"""Get single datasource by java gateway.
Will use datasource_name to query database, and then filter by
datasource_type if provided.
@@ -145,7 +146,7 @@ class GatewayEntryPoint:
)
def create_or_grant_project(
- self, user: str, name: str, description: Optional[str] = None
+ self, user: str, name: str, description: str | None = None
):
"""Create or grant project through java gateway."""
return self.gateway.entry_point.createOrGrantProject(user, name,
description)
@@ -167,7 +168,7 @@ class GatewayEntryPoint:
return self.gateway.entry_point.deleteProject(user, code)
def create_tenant(
- self, tenant_name: str, queue_name: str, description: Optional[str] =
None
+ self, tenant_name: str, queue_name: str, description: str | None = None
):
"""Create tenant through java gateway."""
return self.gateway.entry_point.createTenant(
@@ -188,7 +189,7 @@ class GatewayEntryPoint:
tenant_id: int,
code: str,
queue_id: int,
- description: Optional[str] = None,
+ description: str | None = None,
):
"""Update tenant through java gateway."""
return self.gateway.entry_point.updateTenant(
@@ -241,7 +242,7 @@ class GatewayEntryPoint:
self,
project_name: str,
workflow_name: str,
- task_name: Optional[str] = None,
+ task_name: str | None = None,
):
"""Get dependent info through java gateway."""
return self.gateway.entry_point.getDependentInfo(
@@ -269,9 +270,9 @@ class GatewayEntryPoint:
release_state: int,
task_relation_json: str,
task_definition_json: str,
- schedule: Optional[str] = None,
- online_schedule: Optional[bool] = None,
- other_params_json: Optional[str] = None,
+ schedule: str | None = None,
+ online_schedule: bool | None = None,
+ other_params_json: str | None = None,
):
"""Create or update workflow through java gateway."""
return self.gateway.entry_point.createOrUpdateWorkflow(
diff --git a/src/pydolphinscheduler/models/base.py
b/src/pydolphinscheduler/models/base.py
index 007edec..6b0801a 100644
--- a/src/pydolphinscheduler/models/base.py
+++ b/src/pydolphinscheduler/models/base.py
@@ -17,7 +17,7 @@
"""DolphinScheduler Base object."""
-from typing import Dict, Optional
+from __future__ import annotations
# from pydolphinscheduler.models.user import User
from pydolphinscheduler.utils.string import attr2camel
@@ -33,9 +33,9 @@ class Base:
_DEFINE_ATTR: set = set()
# Object default attribute, will add those attribute to `_DEFINE_ATTR`
when init assign missing.
- _DEFAULT_ATTR: Dict = {}
+ _DEFAULT_ATTR: dict = {}
- def __init__(self, name: str, description: Optional[str] = None):
+ def __init__(self, name: str, description: str | None = None):
self.name = name
self.description = description
@@ -49,7 +49,7 @@ class Base:
def get_define_custom(
self, camel_attr: bool = True, custom_attr: set = None
- ) -> Dict:
+ ) -> dict:
"""Get object definition attribute by given attr set."""
content = {}
for attr in custom_attr:
@@ -60,7 +60,7 @@ class Base:
content[attr] = val
return content
- def get_define(self, camel_attr: bool = True) -> Dict:
+ def get_define(self, camel_attr: bool = True) -> dict:
"""Get object definition attribute communicate to Java gateway server.
use attribute `self._DEFINE_ATTR` to determine which attributes should
including when
diff --git a/src/pydolphinscheduler/models/base_side.py
b/src/pydolphinscheduler/models/base_side.py
index 99b4007..2373ad6 100644
--- a/src/pydolphinscheduler/models/base_side.py
+++ b/src/pydolphinscheduler/models/base_side.py
@@ -17,7 +17,7 @@
"""Module for models object."""
-from typing import Optional
+from __future__ import annotations
from pydolphinscheduler import configuration
from pydolphinscheduler.models import Base
@@ -26,7 +26,7 @@ from pydolphinscheduler.models import Base
class BaseSide(Base):
"""Base class for models object, it declare base behavior for them."""
- def __init__(self, name: str, description: Optional[str] = None):
+ def __init__(self, name: str, description: str | None = None):
super().__init__(name, description)
@classmethod
diff --git a/src/pydolphinscheduler/models/datasource.py
b/src/pydolphinscheduler/models/datasource.py
index 5602c30..a7407a2 100644
--- a/src/pydolphinscheduler/models/datasource.py
+++ b/src/pydolphinscheduler/models/datasource.py
@@ -16,9 +16,11 @@
# under the License.
"""Module database."""
+from __future__ import annotations
+
import json
import re
-from typing import NamedTuple, Optional
+from dataclasses import dataclass
from py4j.java_gateway import JavaObject
@@ -27,7 +29,8 @@ from pydolphinscheduler.models.connection import Connection
from pydolphinscheduler.models.meta import ModelMeta
-class TaskUsage(NamedTuple):
+@dataclass
+class TaskUsage:
"""Class for task usage just like datasource in web ui."""
id: int
@@ -92,9 +95,9 @@ class Datasource(metaclass=ModelMeta):
type_: str,
name: str,
connection_params: str,
- user_id: Optional[int] = None,
- id_: Optional[int] = None,
- note: Optional[str] = None,
+ user_id: int | None = None,
+ id_: int | None = None,
+ note: str | None = None,
):
self.id = id_
self.name = name
@@ -106,8 +109,8 @@ class Datasource(metaclass=ModelMeta):
@classmethod
def get(
- cls, datasource_name: str, datasource_type: Optional[str] = None
- ) -> "Datasource":
+ cls, datasource_name: str, datasource_type: str | None = None
+ ) -> Datasource:
"""Get single datasource.
:param datasource_name: datasource name
@@ -122,10 +125,10 @@ class Datasource(metaclass=ModelMeta):
@classmethod
def get_task_usage_4j(
- cls, datasource_name: str, datasource_type: Optional[str] = None
+ cls, datasource_name: str, datasource_type: str | None = None
) -> TaskUsage:
"""Get the necessary information of datasource for task usage in web
UI."""
- datasource: "Datasource" = cls.get(datasource_name, datasource_type)
+ datasource: Datasource = cls.get(datasource_name, datasource_type)
return TaskUsage(
id=datasource.id,
type=datasource.type.upper(),
diff --git a/src/pydolphinscheduler/models/meta.py
b/src/pydolphinscheduler/models/meta.py
index 6aea866..f4da69c 100644
--- a/src/pydolphinscheduler/models/meta.py
+++ b/src/pydolphinscheduler/models/meta.py
@@ -22,10 +22,10 @@ This module contains the ModelMeta class, which is used to
convert ``py4j.java_g
server to get some resource from database, but you want to make sure the
return object is a in Python
object.
"""
+from __future__ import annotations
from functools import wraps
from inspect import signature
-from typing import Dict, Tuple
from py4j.java_gateway import JavaObject
@@ -38,7 +38,7 @@ class ModelMeta(type):
_FUNC_INIT = "__init__"
_PARAM_SELF = "self"
- def __new__(mcs, name: str, bases: Tuple, attrs: Dict):
+ def __new__(mcs, name: str, bases: tuple, attrs: dict):
"""Create a new class."""
if mcs._FUNC_INIT not in attrs:
raise TypeError(
@@ -59,7 +59,7 @@ class ModelMeta(type):
return super().__new__(mcs, name, bases, attrs)
@classmethod
- def j2p(mcs, cm: classmethod, name: str, attrs: Dict, params=None):
+ def j2p(mcs, cm: classmethod, name: str, attrs: dict, params=None):
"""Convert JavaObject to Python object according attribute in the
``__init__`` method.
``py4j.java_gateway.JavaObject`` return the Java object from the
DolphinScheduler server, we can
diff --git a/src/pydolphinscheduler/models/project.py
b/src/pydolphinscheduler/models/project.py
index 308cb66..6d69692 100644
--- a/src/pydolphinscheduler/models/project.py
+++ b/src/pydolphinscheduler/models/project.py
@@ -17,7 +17,7 @@
"""DolphinScheduler Project object."""
-from typing import Optional
+from __future__ import annotations
from pydolphinscheduler import configuration
from pydolphinscheduler.java_gateway import gateway
@@ -30,8 +30,8 @@ class Project(BaseSide):
def __init__(
self,
name: str = configuration.WORKFLOW_PROJECT,
- description: Optional[str] = None,
- code: Optional[int] = None,
+ description: str | None = None,
+ code: int | None = None,
):
super().__init__(name, description)
self.code = code
@@ -43,7 +43,7 @@ class Project(BaseSide):
# gateway_result_checker(result, None)
@classmethod
- def get_project_by_name(cls, user=configuration.USER_NAME, name=None) ->
"Project":
+ def get_project_by_name(cls, user=configuration.USER_NAME, name=None) ->
Project:
"""Get Project by name."""
project = gateway.query_project_by_name(user, name)
if project is None:
diff --git a/src/pydolphinscheduler/models/queue.py
b/src/pydolphinscheduler/models/queue.py
index e6da259..a6daa21 100644
--- a/src/pydolphinscheduler/models/queue.py
+++ b/src/pydolphinscheduler/models/queue.py
@@ -17,7 +17,7 @@
"""DolphinScheduler User object."""
-from typing import Optional
+from __future__ import annotations
from pydolphinscheduler import configuration
from pydolphinscheduler.models import BaseSide
@@ -29,6 +29,6 @@ class Queue(BaseSide):
def __init__(
self,
name: str = configuration.WORKFLOW_QUEUE,
- description: Optional[str] = "",
+ description: str | None = "",
):
super().__init__(name, description)
diff --git a/src/pydolphinscheduler/models/tenant.py
b/src/pydolphinscheduler/models/tenant.py
index 10882c2..5503a89 100644
--- a/src/pydolphinscheduler/models/tenant.py
+++ b/src/pydolphinscheduler/models/tenant.py
@@ -17,7 +17,7 @@
"""DolphinScheduler Tenant object."""
-from typing import Optional
+from __future__ import annotations
from pydolphinscheduler import configuration
from pydolphinscheduler.java_gateway import gateway
@@ -31,10 +31,10 @@ class Tenant(BaseSide):
self,
name: str = configuration.USER_TENANT,
queue: str = configuration.WORKFLOW_QUEUE,
- description: Optional[str] = None,
- tenant_id: Optional[int] = None,
- code: Optional[str] = None,
- user_name: Optional[str] = None,
+ description: str | None = None,
+ tenant_id: int | None = None,
+ code: str | None = None,
+ user_name: str | None = None,
):
super().__init__(name, description)
self.tenant_id = tenant_id
@@ -52,7 +52,7 @@ class Tenant(BaseSide):
# gateway_result_checker(result, None)
@classmethod
- def get_tenant(cls, code: str) -> "Tenant":
+ def get_tenant(cls, code: str) -> Tenant:
"""Get Tenant list."""
tenant = gateway.query_tenant(code)
if tenant is None:
diff --git a/src/pydolphinscheduler/models/user.py
b/src/pydolphinscheduler/models/user.py
index e58af46..26b56c6 100644
--- a/src/pydolphinscheduler/models/user.py
+++ b/src/pydolphinscheduler/models/user.py
@@ -17,7 +17,7 @@
"""DolphinScheduler User object."""
-from typing import Optional
+from __future__ import annotations
from pydolphinscheduler import configuration
from pydolphinscheduler.java_gateway import gateway
@@ -40,15 +40,15 @@ class User(BaseSide):
def __init__(
self,
name: str,
- password: Optional[str] = configuration.USER_PASSWORD,
- email: Optional[str] = configuration.USER_EMAIL,
- phone: Optional[str] = configuration.USER_PHONE,
- tenant: Optional[str] = configuration.USER_TENANT,
- queue: Optional[str] = configuration.WORKFLOW_QUEUE,
- status: Optional[int] = configuration.USER_STATE,
+ password: str | None = configuration.USER_PASSWORD,
+ email: str | None = configuration.USER_EMAIL,
+ phone: str | None = configuration.USER_PHONE,
+ tenant: str | None = configuration.USER_TENANT,
+ queue: str | None = configuration.WORKFLOW_QUEUE,
+ status: int | None = configuration.USER_STATE,
):
super().__init__(name)
- self.user_id: Optional[int] = None
+ self.user_id: int | None = None
self.password = password
self.email = email
self.phone = phone
@@ -79,7 +79,7 @@ class User(BaseSide):
# gateway_result_checker(result, None)
@classmethod
- def get_user(cls, user_id) -> "User":
+ def get_user(cls, user_id) -> User:
"""Get User."""
user = gateway.query_user(user_id)
if user is None:
diff --git a/src/pydolphinscheduler/models/worker_group.py
b/src/pydolphinscheduler/models/worker_group.py
index bc55eaf..188f1f4 100644
--- a/src/pydolphinscheduler/models/worker_group.py
+++ b/src/pydolphinscheduler/models/worker_group.py
@@ -17,7 +17,7 @@
"""DolphinScheduler Worker Group object."""
-from typing import Optional
+from __future__ import annotations
from pydolphinscheduler.models import BaseSide
@@ -25,6 +25,6 @@ from pydolphinscheduler.models import BaseSide
class WorkerGroup(BaseSide):
"""DolphinScheduler Worker Group object."""
- def __init__(self, name: str, address: str, description: Optional[str] =
None):
+ def __init__(self, name: str, address: str, description: str | None =
None):
super().__init__(name, description)
self.address = address
diff --git a/src/pydolphinscheduler/resources_plugin/base/bucket.py
b/src/pydolphinscheduler/resources_plugin/base/bucket.py
index a9d48a3..64be694 100644
--- a/src/pydolphinscheduler/resources_plugin/base/bucket.py
+++ b/src/pydolphinscheduler/resources_plugin/base/bucket.py
@@ -16,6 +16,9 @@
# under the License.
"""DolphinScheduler BucketFileInfo and Bucket object."""
+
+from __future__ import annotations
+
from abc import ABCMeta, abstractmethod
from typing import Optional
@@ -28,11 +31,7 @@ class BucketFileInfo:
"""
def __init__(
- self,
- bucket: Optional[str] = None,
- file_path: Optional[str] = None,
- *args,
- **kwargs
+ self, bucket: str | None = None, file_path: str | None = None, *args,
**kwargs
):
self.bucket = bucket
self.file_path = file_path
@@ -48,11 +47,11 @@ class OSSFileInfo(BucketFileInfo):
def __init__(
self,
- endpoint: Optional[str] = None,
- bucket: Optional[str] = None,
- file_path: Optional[str] = None,
+ endpoint: str | None = None,
+ bucket: str | None = None,
+ file_path: str | None = None,
*args,
- **kwargs
+ **kwargs,
):
super().__init__(bucket=bucket, file_path=file_path, *args, **kwargs)
self.endpoint = endpoint
@@ -66,11 +65,7 @@ class S3FileInfo(BucketFileInfo):
"""
def __init__(
- self,
- bucket: Optional[str] = None,
- file_path: Optional[str] = None,
- *args,
- **kwargs
+ self, bucket: str | None = None, file_path: str | None = None, *args,
**kwargs
):
super().__init__(bucket=bucket, file_path=file_path, *args, **kwargs)
diff --git a/src/pydolphinscheduler/resources_plugin/base/git.py
b/src/pydolphinscheduler/resources_plugin/base/git.py
index 53b5509..1cdeb5b 100644
--- a/src/pydolphinscheduler/resources_plugin/base/git.py
+++ b/src/pydolphinscheduler/resources_plugin/base/git.py
@@ -17,6 +17,8 @@
"""DolphinScheduler GitFileInfo and Git object."""
+from __future__ import annotations
+
from abc import ABCMeta, abstractmethod
from typing import Optional
@@ -32,12 +34,12 @@ class GitFileInfo:
def __init__(
self,
- user: Optional[str] = None,
- repo_name: Optional[str] = None,
- branch: Optional[str] = None,
- file_path: Optional[str] = None,
+ user: str | None = None,
+ repo_name: str | None = None,
+ branch: str | None = None,
+ file_path: str | None = None,
*args,
- **kwargs
+ **kwargs,
):
self.user = user
self.repo_name = repo_name
@@ -56,12 +58,12 @@ class GitHubFileInfo(GitFileInfo):
def __init__(
self,
- user: Optional[str] = None,
- repo_name: Optional[str] = None,
- branch: Optional[str] = None,
- file_path: Optional[str] = None,
+ user: str | None = None,
+ repo_name: str | None = None,
+ branch: str | None = None,
+ file_path: str | None = None,
*args,
- **kwargs
+ **kwargs,
):
super().__init__(
user=user,
@@ -69,7 +71,7 @@ class GitHubFileInfo(GitFileInfo):
branch=branch,
file_path=file_path,
*args,
- **kwargs
+ **kwargs,
)
@@ -85,13 +87,13 @@ class GitLabFileInfo(GitFileInfo):
def __init__(
self,
- host: Optional[str] = None,
- user: Optional[str] = None,
- repo_name: Optional[str] = None,
- branch: Optional[str] = None,
- file_path: Optional[str] = None,
+ host: str | None = None,
+ user: str | None = None,
+ repo_name: str | None = None,
+ branch: str | None = None,
+ file_path: str | None = None,
*args,
- **kwargs
+ **kwargs,
):
super().__init__(
user=user,
@@ -99,7 +101,7 @@ class GitLabFileInfo(GitFileInfo):
branch=branch,
file_path=file_path,
*args,
- **kwargs
+ **kwargs,
)
self.host = host
diff --git a/src/pydolphinscheduler/resources_plugin/github.py
b/src/pydolphinscheduler/resources_plugin/github.py
index 4564864..369d377 100644
--- a/src/pydolphinscheduler/resources_plugin/github.py
+++ b/src/pydolphinscheduler/resources_plugin/github.py
@@ -16,8 +16,9 @@
# under the License.
"""DolphinScheduler github resource plugin."""
+from __future__ import annotations
+
import base64
-from typing import Optional
from urllib.parse import urljoin
import requests
@@ -34,13 +35,11 @@ class GitHub(ResourcePlugin, Git):
:param access_token: A string used for identity authentication of GitHub
private repository.
"""
- def __init__(
- self, prefix: str, access_token: Optional[str] = None, *args, **kwargs
- ):
+ def __init__(self, prefix: str, access_token: str | None = None, *args,
**kwargs):
super().__init__(prefix, *args, **kwargs)
self.access_token = access_token
- _git_file_info: Optional[GitHubFileInfo] = None
+ _git_file_info: GitHubFileInfo | None = None
def build_req_api(
self,
diff --git a/src/pydolphinscheduler/resources_plugin/gitlab.py
b/src/pydolphinscheduler/resources_plugin/gitlab.py
index fb01117..323b399 100644
--- a/src/pydolphinscheduler/resources_plugin/gitlab.py
+++ b/src/pydolphinscheduler/resources_plugin/gitlab.py
@@ -16,7 +16,8 @@
# under the License.
"""DolphinScheduler gitlab resource plugin."""
-from typing import Optional
+from __future__ import annotations
+
from urllib.parse import urljoin, urlparse
import gitlab
@@ -40,10 +41,10 @@ class GitLab(ResourcePlugin, Git):
def __init__(
self,
prefix: str,
- private_token: Optional[str] = None,
- oauth_token: Optional[str] = None,
- username: Optional[str] = None,
- password: Optional[str] = None,
+ private_token: str | None = None,
+ oauth_token: str | None = None,
+ username: str | None = None,
+ password: str | None = None,
*args,
**kwargs,
):
diff --git a/src/pydolphinscheduler/resources_plugin/oss.py
b/src/pydolphinscheduler/resources_plugin/oss.py
index 1a9acbb..871b8db 100644
--- a/src/pydolphinscheduler/resources_plugin/oss.py
+++ b/src/pydolphinscheduler/resources_plugin/oss.py
@@ -16,7 +16,8 @@
# under the License.
"""DolphinScheduler oss resource plugin."""
-from typing import Optional
+from __future__ import annotations
+
from urllib.parse import urljoin, urlparse
import oss2
@@ -37,8 +38,8 @@ class OSS(ResourcePlugin, Bucket):
def __init__(
self,
prefix: str,
- access_key_id: Optional[str] = None,
- access_key_secret: Optional[str] = None,
+ access_key_id: str | None = None,
+ access_key_secret: str | None = None,
*args,
**kwargs,
):
@@ -46,7 +47,7 @@ class OSS(ResourcePlugin, Bucket):
self.access_key_id = access_key_id
self.access_key_secret = access_key_secret
- _bucket_file_info: Optional[OSSFileInfo] = None
+ _bucket_file_info: OSSFileInfo | None = None
def get_bucket_file_info(self, path: str):
"""Get file information from the file url, like repository name, user,
branch, and file path."""
diff --git a/src/pydolphinscheduler/resources_plugin/s3.py
b/src/pydolphinscheduler/resources_plugin/s3.py
index da1fe83..5ff16e2 100644
--- a/src/pydolphinscheduler/resources_plugin/s3.py
+++ b/src/pydolphinscheduler/resources_plugin/s3.py
@@ -17,7 +17,8 @@
"""DolphinScheduler S3 resource plugin."""
-from typing import Optional
+from __future__ import annotations
+
from urllib.parse import urljoin
import boto3
@@ -38,16 +39,16 @@ class S3(ResourcePlugin, Bucket):
def __init__(
self,
prefix: str,
- access_key_id: Optional[str] = None,
- access_key_secret: Optional[str] = None,
+ access_key_id: str | None = None,
+ access_key_secret: str | None = None,
*args,
- **kwargs
+ **kwargs,
):
super().__init__(prefix, *args, **kwargs)
self.access_key_id = access_key_id
self.access_key_secret = access_key_secret
- _bucket_file_info: Optional[S3FileInfo] = None
+ _bucket_file_info: S3FileInfo | None = None
def get_bucket_file_info(self, path: str):
"""Get file information from the file url, like repository name, user,
branch, and file path."""
diff --git a/src/pydolphinscheduler/tasks/condition.py
b/src/pydolphinscheduler/tasks/condition.py
index cb139f1..3d8f6f8 100644
--- a/src/pydolphinscheduler/tasks/condition.py
+++ b/src/pydolphinscheduler/tasks/condition.py
@@ -17,7 +17,7 @@
"""Task Conditions."""
-from typing import Dict, List
+from __future__ import annotations
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
@@ -44,7 +44,7 @@ class Status(Base):
"""Get name for Status or its sub class."""
return cls.__name__.upper()
- def get_define(self, camel_attr: bool = True) -> List:
+ def get_define(self, camel_attr: bool = True) -> list:
"""Get status definition attribute communicate to Java gateway
server."""
content = []
for task in self.tasks:
@@ -123,7 +123,7 @@ class ConditionOperator(Base):
setattr(self, attr, result)
return attr
- def get_define(self, camel_attr=True) -> Dict:
+ def get_define(self, camel_attr=True) -> dict:
"""Overwrite Base.get_define to get task Condition specific get
define."""
attr = self.set_define_attr()
dependent_define_attr = self._DEFINE_ATTR.union({attr})
@@ -184,7 +184,7 @@ class Condition(Task):
self.set_downstream([self.success_task, self.failed_task])
@property
- def condition_result(self) -> Dict:
+ def condition_result(self) -> dict:
"""Get condition result define for java gateway."""
return {
"successNode": [self.success_task.code],
@@ -192,7 +192,7 @@ class Condition(Task):
}
@property
- def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
Dict:
+ def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
dict:
"""Override Task.task_params for Condition task.
Condition task have some specials attribute `dependence`, and in most
of the task
diff --git a/src/pydolphinscheduler/tasks/datax.py
b/src/pydolphinscheduler/tasks/datax.py
index 47b8b1f..148b4b2 100644
--- a/src/pydolphinscheduler/tasks/datax.py
+++ b/src/pydolphinscheduler/tasks/datax.py
@@ -17,7 +17,7 @@
"""Task datax."""
-from typing import Dict, List, Optional
+from __future__ import annotations
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.mixin import WorkerResourceMixin
@@ -42,10 +42,10 @@ class CustomDataX(WorkerResourceMixin, Task):
self,
name: str,
json: str,
- xms: Optional[int] = 1,
- xmx: Optional[int] = 1,
+ xms: int | None = 1,
+ xmx: int | None = 1,
*args,
- **kwargs
+ **kwargs,
):
self._json = json
super().__init__(name, TaskType.DATAX, *args, **kwargs)
@@ -100,16 +100,16 @@ class DataX(WorkerResourceMixin, Task):
datatarget_name: str,
sql: str,
target_table: str,
- datasource_type: Optional[str] = None,
- datatarget_type: Optional[str] = None,
- job_speed_byte: Optional[int] = 0,
- job_speed_record: Optional[int] = 1000,
- pre_statements: Optional[List[str]] = None,
- post_statements: Optional[List[str]] = None,
- xms: Optional[int] = 1,
- xmx: Optional[int] = 1,
+ datasource_type: str | None = None,
+ datatarget_type: str | None = None,
+ job_speed_byte: int | None = 0,
+ job_speed_record: int | None = 1000,
+ pre_statements: list[str] | None = None,
+ post_statements: list[str] | None = None,
+ xms: int | None = 1,
+ xmx: int | None = 1,
*args,
- **kwargs
+ **kwargs,
):
self._sql = sql
super().__init__(name, TaskType.DATAX, *args, **kwargs)
@@ -128,7 +128,7 @@ class DataX(WorkerResourceMixin, Task):
self.add_attr(**kwargs)
@property
- def source_params(self) -> Dict:
+ def source_params(self) -> dict:
"""Get source params for datax task."""
datasource_task_u = Datasource.get_task_usage_4j(
self.datasource_name, self.datasource_type
@@ -139,7 +139,7 @@ class DataX(WorkerResourceMixin, Task):
}
@property
- def target_params(self) -> Dict:
+ def target_params(self) -> dict:
"""Get target params for datax task."""
datasource_task_u = Datasource.get_task_usage_4j(
self.datatarget_name, self.datatarget_type
@@ -150,7 +150,7 @@ class DataX(WorkerResourceMixin, Task):
}
@property
- def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
Dict:
+ def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
dict:
"""Override Task.task_params for datax task.
datax task have some specials attribute for task_params, and is odd if
we
diff --git a/src/pydolphinscheduler/tasks/dependent.py
b/src/pydolphinscheduler/tasks/dependent.py
index 98a76ef..6c5ac2b 100644
--- a/src/pydolphinscheduler/tasks/dependent.py
+++ b/src/pydolphinscheduler/tasks/dependent.py
@@ -16,9 +16,9 @@
# under the License.
"""Task dependent."""
+from __future__ import annotations
import warnings
-from typing import Dict, Optional, Tuple
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
@@ -91,9 +91,9 @@ class DependentItem(Base):
self,
project_name: str,
# TODO zhongjiajie should be also depeloped in 4.1.0
- workflow_name: Optional[str] = None,
- dependent_task_name: Optional[str] = DEPENDENT_ALL_TASK_IN_WORKFLOW,
- dependent_date: Optional[DependentDate] = DependentDate.TODAY,
+ workflow_name: str | None = None,
+ dependent_task_name: str | None = DEPENDENT_ALL_TASK_IN_WORKFLOW,
+ dependent_date: DependentDate | None = DependentDate.TODAY,
*args,
**kwargs,
):
@@ -170,7 +170,7 @@ class DependentItem(Base):
return self.dependent_task_name == DEPENDENT_ALL_TASK_IN_WORKFLOW
@property
- def code_parameter(self) -> Tuple:
+ def code_parameter(self) -> tuple:
"""Get name info parameter to query code."""
param = (
self.project_name,
@@ -179,7 +179,7 @@ class DependentItem(Base):
)
return param
- def get_code_from_gateway(self) -> Dict:
+ def get_code_from_gateway(self) -> dict:
"""Get project, definition, task code from given parameter."""
if self._code:
return self._code
@@ -241,7 +241,7 @@ class DependentOperator(Base):
setattr(self, attr, result)
return attr
- def get_define(self, camel_attr=True) -> Dict:
+ def get_define(self, camel_attr=True) -> dict:
"""Overwrite Base.get_define to get task dependent specific get
define."""
attr = self.set_define_attr()
dependent_define_attr = self._DEFINE_ATTR.union({attr})
@@ -280,7 +280,7 @@ class Dependent(Task):
self.dependence = dependence
@property
- def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
Dict:
+ def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
dict:
"""Override Task.task_params for dependent task.
Dependent task have some specials attribute `dependence`, and in most
of the task
diff --git a/src/pydolphinscheduler/tasks/dvc.py
b/src/pydolphinscheduler/tasks/dvc.py
index c5b5cd5..5cbc6f0 100644
--- a/src/pydolphinscheduler/tasks/dvc.py
+++ b/src/pydolphinscheduler/tasks/dvc.py
@@ -16,8 +16,9 @@
# under the License.
"""Task dvc."""
+from __future__ import annotations
+
from copy import deepcopy
-from typing import Dict
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
@@ -48,7 +49,7 @@ class BaseDVC(Task):
self.dvc_repository = repository
@property
- def task_params(self) -> Dict:
+ def task_params(self) -> dict:
"""Return task params."""
self._task_custom_attr = deepcopy(self._task_custom_attr)
self._task_custom_attr.update(self._child_task_dvc_attr)
@@ -86,7 +87,7 @@ class DVCDownload(BaseDVC):
data_path_in_worker: str,
version: str,
*args,
- **kwargs
+ **kwargs,
):
super().__init__(name, repository, *args, **kwargs)
self.dvc_data_location = data_path_in_dvc_repository
@@ -115,7 +116,7 @@ class DVCUpload(BaseDVC):
version: str,
message: str,
*args,
- **kwargs
+ **kwargs,
):
super().__init__(name, repository, *args, **kwargs)
self.dvc_data_location = data_path_in_dvc_repository
diff --git a/src/pydolphinscheduler/tasks/flink.py
b/src/pydolphinscheduler/tasks/flink.py
index 83cae95..fa6a15d 100644
--- a/src/pydolphinscheduler/tasks/flink.py
+++ b/src/pydolphinscheduler/tasks/flink.py
@@ -17,7 +17,7 @@
"""Task Flink."""
-from typing import Optional
+from __future__ import annotations
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.engine import Engine, ProgramType
@@ -58,19 +58,19 @@ class Flink(Engine):
name: str,
main_class: str,
main_package: str,
- program_type: Optional[ProgramType] = ProgramType.SCALA,
- deploy_mode: Optional[DeployMode] = DeployMode.CLUSTER,
- flink_version: Optional[FlinkVersion] = FlinkVersion.LOW_VERSION,
- app_name: Optional[str] = None,
- job_manager_memory: Optional[str] = "1G",
- task_manager_memory: Optional[str] = "2G",
- slot: Optional[int] = 1,
- task_manager: Optional[int] = 2,
- parallelism: Optional[int] = 1,
- main_args: Optional[str] = None,
- others: Optional[str] = None,
+ program_type: ProgramType | None = ProgramType.SCALA,
+ deploy_mode: DeployMode | None = DeployMode.CLUSTER,
+ flink_version: FlinkVersion | None = FlinkVersion.LOW_VERSION,
+ app_name: str | None = None,
+ job_manager_memory: str | None = "1G",
+ task_manager_memory: str | None = "2G",
+ slot: int | None = 1,
+ task_manager: int | None = 2,
+ parallelism: int | None = 1,
+ main_args: str | None = None,
+ others: str | None = None,
*args,
- **kwargs
+ **kwargs,
):
super().__init__(
name,
@@ -79,7 +79,7 @@ class Flink(Engine):
main_package,
program_type,
*args,
- **kwargs
+ **kwargs,
)
self.deploy_mode = deploy_mode
self.flink_version = flink_version
diff --git a/src/pydolphinscheduler/tasks/http.py
b/src/pydolphinscheduler/tasks/http.py
index 781333d..7849317 100644
--- a/src/pydolphinscheduler/tasks/http.py
+++ b/src/pydolphinscheduler/tasks/http.py
@@ -17,7 +17,7 @@
"""Task shell."""
-from typing import Optional
+from __future__ import annotations
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
@@ -67,14 +67,14 @@ class Http(Task):
self,
name: str,
url: str,
- http_method: Optional[str] = HttpMethod.GET,
- http_params: Optional[str] = None,
- http_check_condition: Optional[str] =
HttpCheckCondition.STATUS_CODE_DEFAULT,
- condition: Optional[str] = None,
- connect_timeout: Optional[int] = 60000,
- socket_timeout: Optional[int] = 60000,
+ http_method: str | None = HttpMethod.GET,
+ http_params: str | None = None,
+ http_check_condition: str | None =
HttpCheckCondition.STATUS_CODE_DEFAULT,
+ condition: str | None = None,
+ connect_timeout: int | None = 60000,
+ socket_timeout: int | None = 60000,
*args,
- **kwargs
+ **kwargs,
):
super().__init__(name, TaskType.HTTP, *args, **kwargs)
self.url = url
diff --git a/src/pydolphinscheduler/tasks/map_reduce.py
b/src/pydolphinscheduler/tasks/map_reduce.py
index 5050bd3..146720e 100644
--- a/src/pydolphinscheduler/tasks/map_reduce.py
+++ b/src/pydolphinscheduler/tasks/map_reduce.py
@@ -17,7 +17,7 @@
"""Task MR."""
-from typing import Optional
+from __future__ import annotations
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.engine import Engine, ProgramType
@@ -37,12 +37,12 @@ class MR(Engine):
name: str,
main_class: str,
main_package: str,
- program_type: Optional[ProgramType] = ProgramType.SCALA,
- app_name: Optional[str] = None,
- main_args: Optional[str] = None,
- others: Optional[str] = None,
+ program_type: ProgramType | None = ProgramType.SCALA,
+ app_name: str | None = None,
+ main_args: str | None = None,
+ others: str | None = None,
*args,
- **kwargs
+ **kwargs,
):
super().__init__(
name, TaskType.MR, main_class, main_package, program_type, *args,
**kwargs
diff --git a/src/pydolphinscheduler/tasks/mlflow.py
b/src/pydolphinscheduler/tasks/mlflow.py
index e86797a..f61fc86 100644
--- a/src/pydolphinscheduler/tasks/mlflow.py
+++ b/src/pydolphinscheduler/tasks/mlflow.py
@@ -16,8 +16,9 @@
# under the License.
"""Task mlflow."""
+from __future__ import annotations
+
from copy import deepcopy
-from typing import Dict, Optional
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
@@ -66,7 +67,7 @@ class BaseMLflow(Task):
self.mlflow_tracking_uri = mlflow_tracking_uri
@property
- def task_params(self) -> Dict:
+ def task_params(self) -> dict:
"""Return task params."""
self._task_custom_attr = deepcopy(self._task_custom_attr)
self._task_custom_attr.update(self._child_task_mlflow_attr)
@@ -98,11 +99,11 @@ class MLflowModels(BaseMLflow):
self,
name: str,
model_uri: str,
- mlflow_tracking_uri: Optional[str] = DEFAULT_MLFLOW_TRACKING_URI,
- deploy_mode: Optional[str] = MLflowDeployType.DOCKER,
- port: Optional[int] = 7000,
+ mlflow_tracking_uri: str | None = DEFAULT_MLFLOW_TRACKING_URI,
+ deploy_mode: str | None = MLflowDeployType.DOCKER,
+ port: int | None = 7000,
*args,
- **kwargs
+ **kwargs,
):
"""Init mlflow models task."""
super().__init__(name, mlflow_tracking_uri, *args, **kwargs)
@@ -140,12 +141,12 @@ class MLFlowProjectsCustom(BaseMLflow):
self,
name: str,
repository: str,
- mlflow_tracking_uri: Optional[str] = DEFAULT_MLFLOW_TRACKING_URI,
- experiment_name: Optional[str] = "",
- parameters: Optional[str] = "",
- version: Optional[str] = "master",
+ mlflow_tracking_uri: str | None = DEFAULT_MLFLOW_TRACKING_URI,
+ experiment_name: str | None = "",
+ parameters: str | None = "",
+ version: str | None = "master",
*args,
- **kwargs
+ **kwargs,
):
"""Init mlflow projects task."""
super().__init__(name, mlflow_tracking_uri, *args, **kwargs)
@@ -185,13 +186,13 @@ class MLFlowProjectsAutoML(BaseMLflow):
self,
name: str,
data_path: str,
- automl_tool: Optional[str] = "flaml",
- mlflow_tracking_uri: Optional[str] = DEFAULT_MLFLOW_TRACKING_URI,
- experiment_name: Optional[str] = "",
- model_name: Optional[str] = "",
- parameters: Optional[str] = "",
+ automl_tool: str | None = "flaml",
+ mlflow_tracking_uri: str | None = DEFAULT_MLFLOW_TRACKING_URI,
+ experiment_name: str | None = "",
+ model_name: str | None = "",
+ parameters: str | None = "",
*args,
- **kwargs
+ **kwargs,
):
"""Init mlflow projects task."""
super().__init__(name, mlflow_tracking_uri, *args, **kwargs)
@@ -236,14 +237,14 @@ class MLFlowProjectsBasicAlgorithm(BaseMLflow):
self,
name: str,
data_path: str,
- algorithm: Optional[str] = "lightgbm",
- mlflow_tracking_uri: Optional[str] = DEFAULT_MLFLOW_TRACKING_URI,
- experiment_name: Optional[str] = "",
- model_name: Optional[str] = "",
- parameters: Optional[str] = "",
- search_params: Optional[str] = "",
+ algorithm: str | None = "lightgbm",
+ mlflow_tracking_uri: str | None = DEFAULT_MLFLOW_TRACKING_URI,
+ experiment_name: str | None = "",
+ model_name: str | None = "",
+ parameters: str | None = "",
+ search_params: str | None = "",
*args,
- **kwargs
+ **kwargs,
):
"""Init mlflow projects task."""
super().__init__(name, mlflow_tracking_uri, *args, **kwargs)
diff --git a/src/pydolphinscheduler/tasks/procedure.py
b/src/pydolphinscheduler/tasks/procedure.py
index 5a9d87b..807196a 100644
--- a/src/pydolphinscheduler/tasks/procedure.py
+++ b/src/pydolphinscheduler/tasks/procedure.py
@@ -17,7 +17,7 @@
"""Task procedure."""
-from typing import Dict, Optional
+from __future__ import annotations
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
@@ -47,9 +47,9 @@ class Procedure(Task):
name: str,
datasource_name: str,
method: str,
- datasource_type: Optional[str] = None,
+ datasource_type: str | None = None,
*args,
- **kwargs
+ **kwargs,
):
super().__init__(name, TaskType.PROCEDURE, *args, **kwargs)
self.datasource_name = datasource_name
@@ -57,7 +57,7 @@ class Procedure(Task):
self.method = method
@property
- def datasource(self) -> Dict:
+ def datasource(self) -> dict:
"""Get datasource for procedure task."""
datasource_task_u = Datasource.get_task_usage_4j(
self.datasource_name, self.datasource_type
@@ -68,7 +68,7 @@ class Procedure(Task):
}
@property
- def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
Dict:
+ def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
dict:
"""Override Task.task_params for produce task.
produce task have some specials attribute for task_params, and is odd
if we
diff --git a/src/pydolphinscheduler/tasks/python.py
b/src/pydolphinscheduler/tasks/python.py
index 93cec00..8e2729f 100644
--- a/src/pydolphinscheduler/tasks/python.py
+++ b/src/pydolphinscheduler/tasks/python.py
@@ -17,11 +17,12 @@
"""Task Python."""
+from __future__ import annotations
+
import logging
import re
import types
from pathlib import Path
-from typing import Union
from stmdency.extractor import Extractor
@@ -61,10 +62,10 @@ class Python(WorkerResourceMixin, Task):
_task_custom_attr = {"raw_script"}
ext: set = {".py"}
- ext_attr: Union[str, types.FunctionType] = "_definition"
+ ext_attr: str | types.FunctionType = "_definition"
def __init__(
- self, name: str, definition: Union[str, types.FunctionType], *args,
**kwargs
+ self, name: str, definition: str | types.FunctionType, *args, **kwargs
):
self._definition = definition
super().__init__(name, TaskType.PYTHON, *args, **kwargs)
diff --git a/src/pydolphinscheduler/tasks/pytorch.py
b/src/pydolphinscheduler/tasks/pytorch.py
index 8ae371f..0ba0d7d 100644
--- a/src/pydolphinscheduler/tasks/pytorch.py
+++ b/src/pydolphinscheduler/tasks/pytorch.py
@@ -16,7 +16,7 @@
# under the License.
"""Task Pytorch."""
-from typing import Optional
+from __future__ import annotations
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.mixin import WorkerResourceMixin
@@ -65,12 +65,12 @@ class Pytorch(WorkerResourceMixin, Task):
name: str,
script: str,
script_params: str = "",
- project_path: Optional[str] = DEFAULT.project_path,
- is_create_environment: Optional[bool] = DEFAULT.is_create_environment,
- python_command: Optional[str] = DEFAULT.python_command,
- python_env_tool: Optional[str] = "conda",
- requirements: Optional[str] = "requirements.txt",
- conda_python_version: Optional[str] = "3.7",
+ project_path: str | None = DEFAULT.project_path,
+ is_create_environment: bool | None = DEFAULT.is_create_environment,
+ python_command: str | None = DEFAULT.python_command,
+ python_env_tool: str | None = "conda",
+ requirements: str | None = "requirements.txt",
+ conda_python_version: str | None = "3.7",
*args,
**kwargs,
):
diff --git a/src/pydolphinscheduler/tasks/spark.py
b/src/pydolphinscheduler/tasks/spark.py
index eb9c621..93adb6b 100644
--- a/src/pydolphinscheduler/tasks/spark.py
+++ b/src/pydolphinscheduler/tasks/spark.py
@@ -17,7 +17,7 @@
"""Task Spark."""
-from typing import Optional
+from __future__ import annotations
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.engine import Engine, ProgramType
@@ -51,18 +51,18 @@ class Spark(Engine):
name: str,
main_class: str,
main_package: str,
- program_type: Optional[ProgramType] = ProgramType.SCALA,
- deploy_mode: Optional[DeployMode] = DeployMode.CLUSTER,
- app_name: Optional[str] = None,
- driver_cores: Optional[int] = 1,
- driver_memory: Optional[str] = "512M",
- num_executors: Optional[int] = 2,
- executor_memory: Optional[str] = "2G",
- executor_cores: Optional[int] = 2,
- main_args: Optional[str] = None,
- others: Optional[str] = None,
+ program_type: ProgramType | None = ProgramType.SCALA,
+ deploy_mode: DeployMode | None = DeployMode.CLUSTER,
+ app_name: str | None = None,
+ driver_cores: int | None = 1,
+ driver_memory: str | None = "512M",
+ num_executors: int | None = 2,
+ executor_memory: str | None = "2G",
+ executor_cores: int | None = 2,
+ main_args: str | None = None,
+ others: str | None = None,
*args,
- **kwargs
+ **kwargs,
):
super().__init__(
name,
@@ -71,7 +71,7 @@ class Spark(Engine):
main_package,
program_type,
*args,
- **kwargs
+ **kwargs,
)
self.deploy_mode = deploy_mode
self.app_name = app_name
diff --git a/src/pydolphinscheduler/tasks/sql.py
b/src/pydolphinscheduler/tasks/sql.py
index a320d43..520c291 100644
--- a/src/pydolphinscheduler/tasks/sql.py
+++ b/src/pydolphinscheduler/tasks/sql.py
@@ -16,10 +16,11 @@
# under the License.
"""Task sql."""
+from __future__ import annotations
import logging
import re
-from typing import Dict, List, Optional, Sequence, Union
+from collections.abc import Sequence
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
@@ -79,13 +80,13 @@ class Sql(Task):
name: str,
datasource_name: str,
sql: str,
- datasource_type: Optional[str] = None,
- sql_type: Optional[str] = None,
- pre_statements: Union[str, Sequence[str], None] = None,
- post_statements: Union[str, Sequence[str], None] = None,
- display_rows: Optional[int] = 10,
+ datasource_type: str | None = None,
+ sql_type: str | None = None,
+ pre_statements: str | Sequence[str] | None = None,
+ post_statements: str | Sequence[str] | None = None,
+ display_rows: int | None = 10,
*args,
- **kwargs
+ **kwargs,
):
self._sql = sql
super().__init__(name, TaskType.SQL, *args, **kwargs)
@@ -97,7 +98,7 @@ class Sql(Task):
self.display_rows = display_rows
@staticmethod
- def get_stm_list(stm: Union[str, Sequence[str]]) -> List[str]:
+ def get_stm_list(stm: str | Sequence[str]) -> list[str]:
"""Convert statement to str of list.
:param stm: statements string
@@ -137,7 +138,7 @@ class Sql(Task):
return SqlType.SELECT
@property
- def datasource(self) -> Dict:
+ def datasource(self) -> dict:
"""Get datasource for procedure sql."""
datasource_task_u = Datasource.get_task_usage_4j(
self.datasource_name, self.datasource_type
@@ -148,7 +149,7 @@ class Sql(Task):
}
@property
- def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
Dict:
+ def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
dict:
"""Override Task.task_params for sql task.
sql task have some specials attribute for task_params, and is odd if we
diff --git a/src/pydolphinscheduler/tasks/sub_workflow.py
b/src/pydolphinscheduler/tasks/sub_workflow.py
index 1cb579c..715a21e 100644
--- a/src/pydolphinscheduler/tasks/sub_workflow.py
+++ b/src/pydolphinscheduler/tasks/sub_workflow.py
@@ -17,7 +17,7 @@
"""Task sub workflow."""
-from typing import Dict
+from __future__ import annotations
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
@@ -43,7 +43,7 @@ class SubWorkflow(Task):
"""
return self.get_workflow_info(self.workflow_name).get("code")
- def get_workflow_info(self, workflow_name: str) -> Dict:
+ def get_workflow_info(self, workflow_name: str) -> dict:
"""Get workflow info from java gateway, contains workflow id, name,
code."""
if not self.workflow:
raise PyDSWorkflowNotAssignException(
diff --git a/src/pydolphinscheduler/tasks/switch.py
b/src/pydolphinscheduler/tasks/switch.py
index acc23ba..b0969af 100644
--- a/src/pydolphinscheduler/tasks/switch.py
+++ b/src/pydolphinscheduler/tasks/switch.py
@@ -16,8 +16,7 @@
# under the License.
"""Task Switch."""
-
-from typing import Dict, Optional
+from __future__ import annotations
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
@@ -35,7 +34,7 @@ class SwitchBranch(Base):
"next_node",
}
- def __init__(self, task: Task, exp: Optional[str] = None):
+ def __init__(self, task: Task, exp: str | None = None):
super().__init__(f"Switch.{self.__class__.__name__.upper()}")
self.task = task
self.exp = exp
@@ -46,11 +45,11 @@ class SwitchBranch(Base):
return self.task.code
@property
- def condition(self) -> Optional[str]:
+ def condition(self) -> str | None:
"""Get task switch property condition."""
return self.exp
- def get_define(self, camel_attr: bool = True) -> Dict:
+ def get_define(self, camel_attr: bool = True) -> dict:
"""Get :class:`ConditionBranch` definition attribute communicate to
Java gateway server."""
if self.condition:
self._DEFINE_ATTR.add("condition")
@@ -121,7 +120,7 @@ class SwitchCondition(Base):
setattr(self, "next_node", "")
setattr(self, "depend_task_list", result)
- def get_define(self, camel_attr=True) -> Dict:
+ def get_define(self, camel_attr=True) -> dict:
"""Overwrite Base.get_define to get task Condition specific get
define."""
self.set_define_attr()
return super().get_define()
@@ -154,7 +153,7 @@ class Switch(Task):
self.set_downstream(downstream)
@property
- def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
Dict:
+ def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
dict:
"""Override Task.task_params for switch task.
switch task have some specials attribute `switch`, and in most of the
task
diff --git a/src/pydolphinscheduler/utils/file.py
b/src/pydolphinscheduler/utils/file.py
index 075b902..740f8ba 100644
--- a/src/pydolphinscheduler/utils/file.py
+++ b/src/pydolphinscheduler/utils/file.py
@@ -17,15 +17,16 @@
"""File util for pydolphinscheduler."""
+from __future__ import annotations
+
from pathlib import Path
-from typing import Optional
def write(
content: str,
to_path: str,
- create: Optional[bool] = True,
- overwrite: Optional[bool] = False,
+ create: bool | None = True,
+ overwrite: bool | None = False,
) -> None:
"""Write configs dict to configuration file.
diff --git a/src/pydolphinscheduler/utils/yaml_parser.py
b/src/pydolphinscheduler/utils/yaml_parser.py
index 46ee08c..a79dfec 100644
--- a/src/pydolphinscheduler/utils/yaml_parser.py
+++ b/src/pydolphinscheduler/utils/yaml_parser.py
@@ -16,10 +16,11 @@
# under the License.
"""YAML parser utils, parser yaml string to ``ruamel.yaml`` object and nested
key dict."""
+from __future__ import annotations
import copy
import io
-from typing import Any, Dict, Optional
+from typing import Any
from ruamel.yaml import YAML
from ruamel.yaml.comments import CommentedMap
@@ -58,7 +59,7 @@ class YamlParser:
yaml_parser["one.two2"] = "value4"
"""
- def __init__(self, content: str, delimiter: Optional[str] = "."):
+ def __init__(self, content: str, delimiter: str | None = "."):
self._content = content
self.src_parser = content
self._delimiter = delimiter
@@ -75,7 +76,7 @@ class YamlParser:
self._src_parser = self._yaml.load(content)
def parse_nested_dict(
- self, result: Dict, commented_map: CommentedMap, key: str
+ self, result: dict, commented_map: CommentedMap, key: str
) -> None:
"""Parse :class:`ruamel.yaml.comments.CommentedMap` to nested dict
using :param:`delimiter`."""
if not isinstance(commented_map, CommentedMap):
@@ -86,7 +87,7 @@ class YamlParser:
self.parse_nested_dict(result, commented_map[sub_key], next_key)
@property
- def dict_parser(self) -> Dict:
+ def dict_parser(self) -> dict:
"""Get :class:`CommentedMap` to nested dict using :param:`delimiter`
as key delimiter.
Use Depth-First-Search get all nested key and value, and all key
connect by :param:`delimiter`.
diff --git a/tests/core/test_task.py b/tests/core/test_task.py
index 3e62157..8b7d2a0 100644
--- a/tests/core/test_task.py
+++ b/tests/core/test_task.py
@@ -16,11 +16,12 @@
# under the License.
"""Test Task class function."""
+from __future__ import annotations
+
import logging
import re
import warnings
from datetime import timedelta
-from typing import Set, Tuple, Union
from unittest.mock import PropertyMock, patch
import pytest
@@ -92,7 +93,7 @@ TEST_TASK_RELATION_SIZE = 0
),
],
)
-def test__get_attr(addition: Set, ignore: Set, expect: Set):
+def test__get_attr(addition: set, ignore: set, expect: set):
"""Test task function `_get_attr`."""
task = TestTask(
name="test-get-attr",
@@ -117,7 +118,7 @@ def test__get_attr(addition: Set, ignore: Set, expect: Set):
(None, (0, "CLOSE")),
],
)
-def test_task_timeout(value: Union[timedelta, int], expect: Tuple[int, str]):
+def test_task_timeout(value: timedelta | int, expect: tuple[int, str]):
"""Test task timout attribute."""
task = TestTask(
name="test-get-attr",
diff --git a/tests/core/test_workflow.py b/tests/core/test_workflow.py
index 3e9fbd0..66748ed 100644
--- a/tests/core/test_workflow.py
+++ b/tests/core/test_workflow.py
@@ -16,9 +16,11 @@
# under the License.
"""Test workflow."""
+from __future__ import annotations
+
import warnings
from datetime import datetime, timedelta
-from typing import Any, List, Union
+from typing import Any
from unittest.mock import patch
import pytest
@@ -93,7 +95,7 @@ def test_workflow_default_value(name, value):
("param", dict, {"key": "value"}),
(
"resource_list",
- List,
+ list,
[Resource(name="/dev/test.py", content="hello world",
description="desc")],
),
],
@@ -117,7 +119,7 @@ def test_set_attr(name, cls, expect):
(timedelta(seconds=360), 6),
],
)
-def test_workflow_timeout(value: Union[timedelta, int], expect: int):
+def test_workflow_timeout(value: timedelta | int, expect: int):
"""Test workflow timout attribute."""
with Workflow(TEST_WORKFLOW_NAME, timeout=value) as workflow:
assert (
diff --git a/tests/integration/test_process_definition.py
b/tests/integration/test_process_definition.py
index 010ca76..42aa96a 100644
--- a/tests/integration/test_process_definition.py
+++ b/tests/integration/test_process_definition.py
@@ -16,8 +16,7 @@
# under the License.
"""Test workflow in integration."""
-
-from typing import Dict
+from __future__ import annotations
import pytest
@@ -41,7 +40,7 @@ TASK_NAME = f"task_{WORKFLOW_NAME}"
)
],
)
-def test_change_workflow_attr(pre: Dict, post: Dict):
+def test_change_workflow_attr(pre: dict, post: dict):
"""Test whether workflow success when specific attribute change."""
assert pre.keys() == post.keys(), "Not equal keys for pre and post
attribute."
for attrs in [pre, post]:
diff --git a/tests/tasks/test_condition.py b/tests/tasks/test_condition.py
index 217c727..41dbdb2 100644
--- a/tests/tasks/test_condition.py
+++ b/tests/tasks/test_condition.py
@@ -16,7 +16,8 @@
# under the License.
"""Test Task dependent."""
-from typing import List, Tuple
+from __future__ import annotations
+
from unittest.mock import patch
import pytest
@@ -64,7 +65,7 @@ def test_class_status_status_name(obj: Status, expect: str):
(FAILURE, (ConditionOperator(1), ConditionOperator(2),
ConditionOperator(3))),
],
)
-def test_class_status_depend_item_list_no_expect_type(obj: Status, tasks:
Tuple):
+def test_class_status_depend_item_list_no_expect_type(obj: Status, tasks:
tuple):
"""Test class status and sub class raise error when assign not support
type."""
with pytest.raises(
PyDSParamException, match=".*?only accept class Task or sub class
Task, but get"
@@ -86,7 +87,7 @@ def test_class_status_depend_item_list_no_expect_type(obj:
Status, tasks: Tuple)
(FAILURE, [Task(str(i), TEST_TYPE) for i in range(3)]),
],
)
-def test_class_status_depend_item_list(obj: Status, tasks: Tuple):
+def test_class_status_depend_item_list(obj: Status, tasks: tuple):
"""Test class status and sub class function :func:`depend_item_list`."""
status = obj.status_name()
expect = [
@@ -218,7 +219,7 @@ def test_condition_operator_set_define_attr_status(
],
)
def test_condition_operator_set_define_attr_mix_status(
- obj: ConditionOperator, status: List[Status]
+ obj: ConditionOperator, status: list[Status]
):
"""Test :func:`set_define_attr` with one or more mixed status."""
attr = "depend_item_list"
@@ -288,7 +289,7 @@ def test_condition_operator_set_define_attr_operator(
],
)
def test_condition_operator_set_define_attr_mix_operator(
- cond: ConditionOperator, sub_cond: Tuple[ConditionOperator]
+ cond: ConditionOperator, sub_cond: tuple[ConditionOperator]
):
"""Test :func:`set_define_attr` with one or more class mix condition
operator."""
attr = "depend_task_list"
diff --git a/tests/tasks/test_dependent.py b/tests/tasks/test_dependent.py
index e35ca87..4be4501 100644
--- a/tests/tasks/test_dependent.py
+++ b/tests/tasks/test_dependent.py
@@ -16,9 +16,10 @@
# under the License.
"""Test Task dependent."""
+from __future__ import annotations
+
import itertools
import warnings
-from typing import Dict, List, Optional, Tuple, Union
from unittest.mock import patch
import pytest
@@ -162,7 +163,7 @@ def test_dependent_item_date_error():
({}, None),
],
)
-def test_dependent_item_code_parameter(task_name: dict, result: Optional[str]):
+def test_dependent_item_code_parameter(task_name: dict, result: str | None):
"""Test dependent item property code_parameter."""
dependent_item = DependentItem(
project_name=TEST_PROJECT,
@@ -358,9 +359,9 @@ def test_dependent_operator_set_define_error(mock_code,
arg_list):
)
def test_operator_dependent_item(
mock_code_info,
- operators: Tuple[DependentOperator],
- kwargs: Tuple[dict],
- expect: List[Dict],
+ operators: tuple[DependentOperator],
+ kwargs: tuple[dict],
+ expect: list[dict],
):
"""Test DependentOperator(DependentItem) function get_define.
@@ -557,9 +558,9 @@ def test_operator_dependent_item(
)
def test_operator_dependent_task_list_multi_dependent_item(
mock_code_info,
- operators: Tuple[DependentOperator],
- args: Tuple[Union[Tuple, dict]],
- expect: List[Dict],
+ operators: tuple[DependentOperator],
+ args: tuple[tuple | dict],
+ expect: list[dict],
):
"""Test DependentOperator(DependentOperator(DependentItem)) single
operator function get_define.
@@ -687,9 +688,9 @@ def get_dep_task_list(*operator):
)
def test_operator_dependent_task_list_multi_dependent_list(
mock_code_info,
- operators: Tuple[DependentOperator],
- args: Tuple[Union[Tuple, dict]],
- expect: List[Dict],
+ operators: tuple[DependentOperator],
+ args: tuple[tuple | dict],
+ expect: list[dict],
):
"""Test DependentOperator(DependentOperator(DependentItem)) multiply
operator function get_define.
diff --git a/tests/tasks/test_switch.py b/tests/tasks/test_switch.py
index dcd5e40..e9a24bf 100644
--- a/tests/tasks/test_switch.py
+++ b/tests/tasks/test_switch.py
@@ -16,8 +16,8 @@
# under the License.
"""Test Task switch."""
+from __future__ import annotations
-from typing import Optional, Tuple
from unittest.mock import patch
import pytest
@@ -37,7 +37,7 @@ TEST_NAME = "test-task"
TEST_TYPE = "test-type"
-def task_switch_arg_wrapper(obj, task: Task, exp: Optional[str] = None) ->
SwitchBranch:
+def task_switch_arg_wrapper(obj, task: Task, exp: str | None = None) ->
SwitchBranch:
"""Wrap task switch and its subclass."""
if obj is Default:
return obj(task)
@@ -121,7 +121,7 @@ def test_switch_branch_get_define_condition(obj:
SwitchBranch):
),
],
)
-def test_switch_condition_set_define_attr_error(args: Tuple, msg: str):
+def test_switch_condition_set_define_attr_error(args: tuple, msg: str):
"""Test error case on :class:`SwitchCondition`."""
switch_condition = SwitchCondition(*args)
with pytest.raises(PyDSParamException, match=msg):
diff --git a/tests/testing/docker_wrapper.py b/tests/testing/docker_wrapper.py
index a3d0b6e..70ca98d 100644
--- a/tests/testing/docker_wrapper.py
+++ b/tests/testing/docker_wrapper.py
@@ -16,9 +16,9 @@
# under the License.
"""Wrap docker commands for easier create docker container."""
+from __future__ import annotations
import time
-from typing import Optional
import docker
from docker.errors import ImageNotFound
@@ -50,7 +50,7 @@ class DockerWrapper:
)
def run_until_log(
- self, log: str, remove_exists: Optional[bool] = True, *args, **kwargs
+ self, log: str, remove_exists: bool | None = True, *args, **kwargs
) -> Container:
"""Create and run a new container, return when specific log appear.
diff --git a/tests/testing/file.py b/tests/testing/file.py
index 9816b94..67fd4ba 100644
--- a/tests/testing/file.py
+++ b/tests/testing/file.py
@@ -16,18 +16,18 @@
# under the License.
"""Testing util about file operating."""
+from __future__ import annotations
from pathlib import Path
-from typing import Union
-def get_file_content(path: Union[str, Path]) -> str:
+def get_file_content(path: str | Path) -> str:
"""Get file content in given path."""
with open(path) as f:
return f.read()
-def delete_file(path: Union[str, Path]) -> None:
+def delete_file(path: str | Path) -> None:
"""Delete file in given path."""
path = Path(path).expanduser() if isinstance(path, str) else
path.expanduser()
if path.exists():
diff --git a/tests/testing/path.py b/tests/testing/path.py
index 974ab3d..0359a23 100644
--- a/tests/testing/path.py
+++ b/tests/testing/path.py
@@ -16,9 +16,11 @@
# under the License.
"""Handle path related issue in test module."""
+from __future__ import annotations
+from collections.abc import Generator
from pathlib import Path
-from typing import Any, Generator
+from typing import Any
project_root = Path(__file__).parent.parent.parent
diff --git a/tests/utils/test_date.py b/tests/utils/test_date.py
index 6546c4a..9599d61 100644
--- a/tests/utils/test_date.py
+++ b/tests/utils/test_date.py
@@ -16,9 +16,9 @@
# under the License.
"""Test utils.date module."""
+from __future__ import annotations
from datetime import datetime, timedelta
-from typing import Dict
import pytest
@@ -97,7 +97,7 @@ def test_conv_from_str_not_impl(src: str) -> None:
({"hours": 1.3}, 78),
],
)
-def test_timedelta2timeout(src: Dict, expect: int) -> None:
+def test_timedelta2timeout(src: dict, expect: int) -> None:
"""Test function timedelta2timeout."""
td = timedelta(**src)
assert timedelta2timeout(td) == expect, f"Test case {td} not expect to
{expect}."
diff --git a/tests/utils/test_yaml_parser.py b/tests/utils/test_yaml_parser.py
index 6f084ba..c017067 100644
--- a/tests/utils/test_yaml_parser.py
+++ b/tests/utils/test_yaml_parser.py
@@ -16,8 +16,7 @@
# under the License.
"""Test utils.path_dict module."""
-
-from typing import Dict
+from __future__ import annotations
import pytest
from ruamel.yaml import YAML
@@ -99,7 +98,7 @@ with open(path_default_config_yaml) as f:
),
],
)
-def test_yaml_parser_specific_delimiter(src: str, delimiter: str, expect:
Dict):
+def test_yaml_parser_specific_delimiter(src: str, delimiter: str, expect:
dict):
"""Test specific delimiter for :class:`YamlParser`."""
def ch_dl(key):
@@ -135,7 +134,7 @@ def test_yaml_parser_specific_delimiter(src: str,
delimiter: str, expect: Dict):
),
],
)
-def test_yaml_parser_contains(src: str, expect: Dict):
+def test_yaml_parser_contains(src: str, expect: dict):
"""Test magic function :func:`YamlParser.__contain__` also with `key in
obj` syntax."""
yaml_parser = YamlParser(src)
assert len(expect.keys()) == len(
@@ -159,7 +158,7 @@ def test_yaml_parser_contains(src: str, expect: Dict):
),
],
)
-def test_yaml_parser_get(src: str, expect: Dict):
+def test_yaml_parser_get(src: str, expect: dict):
"""Test magic function :func:`YamlParser.__getitem__` also with `obj[key]`
syntax."""
yaml_parser = YamlParser(src)
assert all(
@@ -191,7 +190,7 @@ def test_yaml_parser_get(src: str, expect: Dict):
),
],
)
-def test_yaml_parser_set(src: str, expect: Dict):
+def test_yaml_parser_set(src: str, expect: dict):
"""Test magic function :func:`YamlParser.__setitem__` also with `obj[key]
= val` syntax."""
yaml_parser = YamlParser(src)
for key in expect:
@@ -241,7 +240,7 @@ name:
),
],
)
-def test_yaml_parser_str_repr(src: str, setter: Dict, expect: str):
+def test_yaml_parser_str_repr(src: str, setter: dict, expect: str):
"""Test function :func:`YamlParser.to_string`."""
yaml_parser = YamlParser(src)