This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git


The following commit(s) were added to refs/heads/main by this push:
     new e1d1527  feat: Apply PEP-563 for codebase (#128)
e1d1527 is described below

commit e1d15274a6ffc2cf2ee3236d3ba3e8a0b1cf70d3
Author: Jay Chung <[email protected]>
AuthorDate: Mon Dec 25 18:35:00 2023 +0800

    feat: Apply PEP-563 for codebase (#128)
---
 .ruff.toml                                         |  2 +
 setup.py                                           |  5 +-
 src/pydolphinscheduler/core/engine.py              |  8 +-
 src/pydolphinscheduler/core/resource.py            |  8 +-
 src/pydolphinscheduler/core/task.py                | 97 +++++++++++-----------
 src/pydolphinscheduler/core/workflow.py            | 70 ++++++++--------
 src/pydolphinscheduler/core/yaml_workflow.py       |  6 +-
 src/pydolphinscheduler/java_gateway.py             | 29 +++----
 src/pydolphinscheduler/models/base.py              | 10 +--
 src/pydolphinscheduler/models/base_side.py         |  4 +-
 src/pydolphinscheduler/models/datasource.py        | 21 +++--
 src/pydolphinscheduler/models/meta.py              |  6 +-
 src/pydolphinscheduler/models/project.py           |  8 +-
 src/pydolphinscheduler/models/queue.py             |  4 +-
 src/pydolphinscheduler/models/tenant.py            | 12 +--
 src/pydolphinscheduler/models/user.py              | 18 ++--
 src/pydolphinscheduler/models/worker_group.py      |  4 +-
 .../resources_plugin/base/bucket.py                | 23 ++---
 .../resources_plugin/base/git.py                   | 38 +++++----
 src/pydolphinscheduler/resources_plugin/github.py  |  9 +-
 src/pydolphinscheduler/resources_plugin/gitlab.py  | 11 +--
 src/pydolphinscheduler/resources_plugin/oss.py     |  9 +-
 src/pydolphinscheduler/resources_plugin/s3.py      | 11 +--
 src/pydolphinscheduler/tasks/condition.py          | 10 +--
 src/pydolphinscheduler/tasks/datax.py              | 32 +++----
 src/pydolphinscheduler/tasks/dependent.py          | 16 ++--
 src/pydolphinscheduler/tasks/dvc.py                |  9 +-
 src/pydolphinscheduler/tasks/flink.py              | 28 +++----
 src/pydolphinscheduler/tasks/http.py               | 16 ++--
 src/pydolphinscheduler/tasks/map_reduce.py         | 12 +--
 src/pydolphinscheduler/tasks/mlflow.py             | 49 +++++------
 src/pydolphinscheduler/tasks/procedure.py          | 10 +--
 src/pydolphinscheduler/tasks/python.py             |  7 +-
 src/pydolphinscheduler/tasks/pytorch.py            | 14 ++--
 src/pydolphinscheduler/tasks/spark.py              | 26 +++---
 src/pydolphinscheduler/tasks/sql.py                | 21 ++---
 src/pydolphinscheduler/tasks/sub_workflow.py       |  4 +-
 src/pydolphinscheduler/tasks/switch.py             | 13 ++-
 src/pydolphinscheduler/utils/file.py               |  7 +-
 src/pydolphinscheduler/utils/yaml_parser.py        |  9 +-
 tests/core/test_task.py                            |  7 +-
 tests/core/test_workflow.py                        |  8 +-
 tests/integration/test_process_definition.py       |  5 +-
 tests/tasks/test_condition.py                      | 11 +--
 tests/tasks/test_dependent.py                      | 23 ++---
 tests/tasks/test_switch.py                         |  6 +-
 tests/testing/docker_wrapper.py                    |  4 +-
 tests/testing/file.py                              |  6 +-
 tests/testing/path.py                              |  4 +-
 tests/utils/test_date.py                           |  4 +-
 tests/utils/test_yaml_parser.py                    | 13 ++-
 51 files changed, 404 insertions(+), 383 deletions(-)

diff --git a/.ruff.toml b/.ruff.toml
index 4e42e71..7ddd14b 100644
--- a/.ruff.toml
+++ b/.ruff.toml
@@ -1,5 +1,7 @@
 src = ["src"]
 
+target-version = "py38"
+
 # max-line-length = 110
 line-length = 110
 
diff --git a/setup.py b/setup.py
index 7a75bc8..a56b672 100644
--- a/setup.py
+++ b/setup.py
@@ -16,13 +16,14 @@
 # under the License.
 
 """The script for setting up pydolphinscheduler."""
+from __future__ import annotations
+
 import logging
 import os
 import sys
 from distutils.command.sdist import sdist
 from distutils.dir_util import remove_tree
 from distutils.errors import DistutilsExecError
-from typing import List
 
 from setuptools import Command, setup
 
@@ -38,7 +39,7 @@ class CleanCommand(Command):
     """Command to clean up python api before setup by running `python setup.py 
clean`."""
 
     description = "Clean up project root"
-    user_options: List[str] = []
+    user_options: list[str] = []
     clean_list = [
         "build",
         "htmlcov",
diff --git a/src/pydolphinscheduler/core/engine.py 
b/src/pydolphinscheduler/core/engine.py
index 8cf8469..3e36820 100644
--- a/src/pydolphinscheduler/core/engine.py
+++ b/src/pydolphinscheduler/core/engine.py
@@ -17,7 +17,7 @@
 
 """Module engine."""
 
-from typing import Dict, Optional
+from __future__ import annotations
 
 from py4j.protocol import Py4JJavaError
 
@@ -47,9 +47,9 @@ class Engine(Task):
         task_type: str,
         main_class: str,
         main_package: str,
-        program_type: Optional[ProgramType] = ProgramType.SCALA,
+        program_type: ProgramType | None = ProgramType.SCALA,
         *args,
-        **kwargs
+        **kwargs,
     ):
         super().__init__(name, task_type, *args, **kwargs)
         self.main_class = main_class
@@ -76,7 +76,7 @@ class Engine(Task):
         return self.get_resource_info(self.program_type, 
self.main_package).get("id")
 
     @property
-    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
Dict:
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
dict:
         """Override Task.task_params for engine children task.
 
         children task have some specials attribute for task_params, and is odd 
if we
diff --git a/src/pydolphinscheduler/core/resource.py 
b/src/pydolphinscheduler/core/resource.py
index 907114f..09b0b88 100644
--- a/src/pydolphinscheduler/core/resource.py
+++ b/src/pydolphinscheduler/core/resource.py
@@ -17,7 +17,7 @@
 
 """Module resource."""
 
-from typing import Optional
+from __future__ import annotations
 
 from pydolphinscheduler.exceptions import PyDSParamException
 from pydolphinscheduler.java_gateway import gateway
@@ -38,9 +38,9 @@ class Resource(Base):
     def __init__(
         self,
         name: str,
-        content: Optional[str] = None,
-        description: Optional[str] = None,
-        user_name: Optional[str] = None,
+        content: str | None = None,
+        description: str | None = None,
+        user_name: str | None = None,
     ):
         super().__init__(name, description)
         self.content = content
diff --git a/src/pydolphinscheduler/core/task.py 
b/src/pydolphinscheduler/core/task.py
index 5f9fccb..ac48ab8 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -16,12 +16,15 @@
 # under the License.
 
 """DolphinScheduler Task and TaskRelation object."""
+
+from __future__ import annotations
+
 import copy
 import types
 import warnings
+from collections.abc import Sequence
 from datetime import timedelta
 from logging import getLogger
-from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
 
 from pydolphinscheduler import configuration
 from pydolphinscheduler.constants import (
@@ -72,7 +75,7 @@ class TaskRelation(Base):
         self,
         pre_task_code: int,
         post_task_code: int,
-        name: Optional[str] = None,
+        name: str | None = None,
     ):
         super().__init__(name)
         self.pre_task_code = pre_task_code
@@ -147,7 +150,7 @@ class Task(Base):
     _task_custom_attr: set = set()
 
     ext: set = None
-    ext_attr: Union[str, types.FunctionType] = None
+    ext_attr: str | types.FunctionType = None
 
     DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
 
@@ -155,27 +158,27 @@ class Task(Base):
         self,
         name: str,
         task_type: str,
-        description: Optional[str] = None,
-        flag: Optional[str] = TaskFlag.YES,
-        task_priority: Optional[str] = TaskPriority.MEDIUM,
-        worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
-        environment_name: Optional[str] = None,
-        task_group_id: Optional[int] = 0,
-        task_group_priority: Optional[int] = 0,
-        delay_time: Optional[int] = 0,
-        fail_retry_times: Optional[int] = 0,
-        fail_retry_interval: Optional[int] = 1,
-        timeout_notify_strategy: Optional = None,
-        timeout: Optional[Union[timedelta, int]] = None,
-        workflow: Optional[Workflow] = None,
-        resource_list: Optional[List] = None,
-        dependence: Optional[Dict] = None,
-        wait_start_timeout: Optional[Dict] = None,
-        condition_result: Optional[Dict] = None,
-        resource_plugin: Optional[ResourcePlugin] = None,
-        is_cache: Optional[bool] = False,
-        input_params: Optional[Dict] = None,
-        output_params: Optional[Dict] = None,
+        description: str | None = None,
+        flag: str | None = TaskFlag.YES,
+        task_priority: str | None = TaskPriority.MEDIUM,
+        worker_group: str | None = configuration.WORKFLOW_WORKER_GROUP,
+        environment_name: str | None = None,
+        task_group_id: int | None = 0,
+        task_group_priority: int | None = 0,
+        delay_time: int | None = 0,
+        fail_retry_times: int | None = 0,
+        fail_retry_interval: int | None = 1,
+        timeout_notify_strategy: str | None = None,
+        timeout: timedelta | int | None = None,
+        workflow: Workflow | None = None,
+        resource_list: list | None = None,
+        dependence: dict | None = None,
+        wait_start_timeout: dict | None = None,
+        condition_result: dict | None = None,
+        resource_plugin: ResourcePlugin | None = None,
+        is_cache: bool | None = False,
+        input_params: dict | None = None,
+        output_params: dict | None = None,
         *args,
         **kwargs,
     ):
@@ -192,7 +195,7 @@ class Task(Base):
         self.fail_retry_interval = fail_retry_interval
         self.delay_time = delay_time
         self.timeout_notify_strategy = timeout_notify_strategy
-        self._timeout: Union[timedelta, int] = timeout
+        self._timeout: timedelta | int = timeout
         self._workflow = None
         self._input_params = input_params or {}
         self._output_params = output_params or {}
@@ -214,9 +217,9 @@ class Task(Base):
             )
             self._local_params = kwargs.get("local_params")
 
-        self._upstream_task_codes: Set[int] = set()
-        self._downstream_task_codes: Set[int] = set()
-        self._task_relation: Set[TaskRelation] = set()
+        self._upstream_task_codes: set[int] = set()
+        self._downstream_task_codes: set[int] = set()
+        self._task_relation: set[TaskRelation] = set()
         # move attribute code and version after _workflow and workflow declare
         self.code, self.version = self.gen_code_and_version()
         # Add task to workflow, maybe we could put into property workflow 
latter
@@ -238,12 +241,12 @@ class Task(Base):
         self.get_content()
 
     @property
-    def workflow(self) -> Optional[Workflow]:
+    def workflow(self) -> Workflow | None:
         """Get attribute workflow."""
         return self._workflow
 
     @workflow.setter
-    def workflow(self, workflow: Optional[Workflow]):
+    def workflow(self, workflow: Workflow | None):
         """Set attribute workflow."""
         self._workflow = workflow
 
@@ -270,7 +273,7 @@ class Task(Base):
             raise PyDSParamException("is_cache must be a bool")
 
     @property
-    def resource_list(self) -> List[Dict[str, Resource]]:
+    def resource_list(self) -> list[dict[str, Resource]]:
         """Get task define attribute `resource_list`."""
         resources = set()
         for res in self._resource_list:
@@ -292,7 +295,7 @@ class Task(Base):
         return [{ResourceKey.NAME: r} for r in resources]
 
     @property
-    def user_name(self) -> Optional[str]:
+    def user_name(self) -> str | None:
         """Return username of workflow."""
         if self.workflow:
             return self.workflow.user.name
@@ -300,16 +303,16 @@ class Task(Base):
             raise PyDSParamException("`user_name` cannot be empty.")
 
     @property
-    def condition_result(self) -> Dict:
+    def condition_result(self) -> dict:
         """Get attribute condition_result."""
         return self._condition_result
 
     @condition_result.setter
-    def condition_result(self, condition_result: Optional[Dict]):
+    def condition_result(self, condition_result: dict | None):
         """Set attribute condition_result."""
         self._condition_result = condition_result
 
-    def _get_attr(self) -> Set[str]:
+    def _get_attr(self) -> set[str]:
         """Get final task task_params attribute.
 
         Base on `_task_default_attr`, append attribute from 
`_task_custom_attr` and subtract attribute from
@@ -321,7 +324,7 @@ class Task(Base):
         return attr
 
     @property
-    def task_params(self) -> Optional[Dict]:
+    def task_params(self) -> dict | None:
         """Get task parameter object.
 
         Will get result to combine _task_custom_attr and custom_attr.
@@ -373,29 +376,27 @@ class Task(Base):
     def __hash__(self):
         return hash(self.code)
 
-    def __lshift__(self, other: Union["Task", Sequence["Task"]]):
+    def __lshift__(self, other: Task | Sequence[Task]):
         """Implement Task << Task."""
         self.set_upstream(other)
         return other
 
-    def __rshift__(self, other: Union["Task", Sequence["Task"]]):
+    def __rshift__(self, other: Task | Sequence[Task]):
         """Implement Task >> Task."""
         self.set_downstream(other)
         return other
 
-    def __rrshift__(self, other: Union["Task", Sequence["Task"]]):
+    def __rrshift__(self, other: Task | Sequence[Task]):
         """Call for Task >> [Task] because list don't have __rshift__ 
operators."""
         self.__lshift__(other)
         return self
 
-    def __rlshift__(self, other: Union["Task", Sequence["Task"]]):
+    def __rlshift__(self, other: Task | Sequence[Task]):
         """Call for Task << [Task] because list don't have __lshift__ 
operators."""
         self.__rshift__(other)
         return self
 
-    def _set_deps(
-        self, tasks: Union["Task", Sequence["Task"]], upstream: bool = True
-    ) -> None:
+    def _set_deps(self, tasks: Task | Sequence[Task], upstream: bool = True) 
-> None:
         """Set parameter tasks dependent to current task.
 
         it is a wrapper for :func:`set_upstream` and :func:`set_downstream`.
@@ -427,16 +428,16 @@ class Task(Base):
                     )
                     self.workflow._task_relations.add(task_relation)
 
-    def set_upstream(self, tasks: Union["Task", Sequence["Task"]]) -> None:
+    def set_upstream(self, tasks: Task | Sequence[Task]) -> None:
         """Set parameter tasks as upstream to current task."""
         self._set_deps(tasks, upstream=True)
 
-    def set_downstream(self, tasks: Union["Task", Sequence["Task"]]) -> None:
+    def set_downstream(self, tasks: Task | Sequence[Task]) -> None:
         """Set parameter tasks as downstream to current task."""
         self._set_deps(tasks, upstream=False)
 
     # TODO code should better generate in bulk mode when :ref: workflow run 
submit or start
-    def gen_code_and_version(self) -> Tuple:
+    def gen_code_and_version(self) -> tuple:
         """Generate task code and version from java gateway.
 
         If task name do not exists in workflow before, if will generate new 
code and version id
@@ -474,7 +475,7 @@ class Task(Base):
     def add_in(
         self,
         name: str,
-        value: Optional[Union[int, str, float, bool, BaseDataType]] = None,
+        value: int | str | float | bool | BaseDataType | None = None,
     ):
         """Add input parameters.
 
@@ -494,7 +495,7 @@ class Task(Base):
     def add_out(
         self,
         name: str,
-        value: Optional[Union[int, str, float, bool, BaseDataType]] = None,
+        value: int | str | float | bool | BaseDataType | None = None,
     ):
         """Add output parameters.
 
diff --git a/src/pydolphinscheduler/core/workflow.py 
b/src/pydolphinscheduler/core/workflow.py
index 99d14ac..7fac15b 100644
--- a/src/pydolphinscheduler/core/workflow.py
+++ b/src/pydolphinscheduler/core/workflow.py
@@ -17,9 +17,11 @@
 
 """Module workflow, core class for workflow define."""
 
+from __future__ import annotations
+
 import json
 from datetime import datetime, timedelta
-from typing import Any, Dict, List, Optional, Set, Union
+from typing import Any
 
 from pydolphinscheduler import configuration
 from pydolphinscheduler.constants import Symbol, TaskType
@@ -39,15 +41,15 @@ from pydolphinscheduler.utils.date import (
 class WorkflowContext:
     """Class workflow context, use when task get workflow from context 
expression."""
 
-    _context_managed_workflow: Optional["Workflow"] = None
+    _context_managed_workflow: Workflow | None = None
 
     @classmethod
-    def set(cls, workflow: "Workflow") -> None:
+    def set(cls, workflow: Workflow) -> None:
         """Set attribute self._context_managed_workflow."""
         cls._context_managed_workflow = workflow
 
     @classmethod
-    def get(cls) -> Optional["Workflow"]:
+    def get(cls) -> Workflow | None:
         """Get attribute self._context_managed_workflow."""
         return cls._context_managed_workflow
 
@@ -125,23 +127,23 @@ class Workflow(Base):
     def __init__(
         self,
         name: str,
-        description: Optional[str] = None,
-        schedule: Optional[str] = None,
-        online_schedule: Optional[bool] = None,
-        start_time: Optional[Union[str, datetime]] = None,
-        end_time: Optional[Union[str, datetime]] = None,
-        timezone: Optional[str] = configuration.WORKFLOW_TIME_ZONE,
-        user: Optional[str] = configuration.WORKFLOW_USER,
-        project: Optional[str] = configuration.WORKFLOW_PROJECT,
-        worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
-        warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE,
-        warning_group_id: Optional[int] = 0,
-        execution_type: Optional[str] = configuration.WORKFLOW_EXECUTION_TYPE,
-        timeout: Optional[Union[timedelta, int]] = 0,
-        release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
-        param: Optional[Dict] = None,
-        resource_plugin: Optional[ResourcePlugin] = None,
-        resource_list: Optional[List[Resource]] = None,
+        description: str | None = None,
+        schedule: str | None = None,
+        online_schedule: bool | None = None,
+        start_time: str | datetime | None = None,
+        end_time: str | datetime | None = None,
+        timezone: str | None = configuration.WORKFLOW_TIME_ZONE,
+        user: str | None = configuration.WORKFLOW_USER,
+        project: str | None = configuration.WORKFLOW_PROJECT,
+        worker_group: str | None = configuration.WORKFLOW_WORKER_GROUP,
+        warning_type: str | None = configuration.WORKFLOW_WARNING_TYPE,
+        warning_group_id: int | None = 0,
+        execution_type: str | None = configuration.WORKFLOW_EXECUTION_TYPE,
+        timeout: timedelta | int | None = 0,
+        release_state: str | None = configuration.WORKFLOW_RELEASE_STATE,
+        param: dict | None = None,
+        resource_plugin: ResourcePlugin | None = None,
+        resource_list: list[Resource] | None = None,
         *args,
         **kwargs,
     ):
@@ -187,17 +189,17 @@ class Workflow(Base):
             )
         else:
             self._execution_type = execution_type
-        self._timeout: Union[timedelta, int] = timeout
+        self._timeout: timedelta | int = timeout
         self._release_state = release_state
         self.param = param
         self.tasks: dict = {}
         self.resource_plugin = resource_plugin
         # TODO how to fix circle import
-        self._task_relations: set["TaskRelation"] = set()  # noqa: F821
+        self._task_relations: set[TaskRelation] = set()  # noqa: F821
         self._workflow_code = None
         self.resource_list = resource_list or []
 
-    def __enter__(self) -> "Workflow":
+    def __enter__(self) -> Workflow:
         WorkflowContext.set(self)
         return self
 
@@ -290,7 +292,7 @@ class Workflow(Base):
         self._execution_type = val
 
     @property
-    def param_json(self) -> Optional[List[Dict]]:
+    def param_json(self) -> list[dict] | None:
         """Return param json base on self.param."""
         # Handle empty dict and None value
         if not self.param:
@@ -306,7 +308,7 @@ class Workflow(Base):
         ]
 
     @property
-    def task_definition_json(self) -> List[Dict]:
+    def task_definition_json(self) -> list[dict]:
         """Return all tasks definition in list of dict."""
         if not self.tasks:
             return [self.tasks]
@@ -314,7 +316,7 @@ class Workflow(Base):
             return [task.get_define() for task in self.tasks.values()]
 
     @property
-    def task_relation_json(self) -> List[Dict]:
+    def task_relation_json(self) -> list[dict]:
         """Return all relation between tasks pair in list of dict."""
         if not self.tasks:
             return [self.tasks]
@@ -323,7 +325,7 @@ class Workflow(Base):
             return [tr.get_define() for tr in self._task_relations]
 
     @property
-    def schedule_json(self) -> Optional[Dict]:
+    def schedule_json(self) -> dict | None:
         """Get schedule parameter json object. This is requests from java 
gateway interface."""
         if not self.schedule:
             return None
@@ -342,7 +344,7 @@ class Workflow(Base):
             }
 
     @property
-    def task_list(self) -> List["Task"]:  # noqa: F821
+    def task_list(self) -> list[Task]:  # noqa: F821
         """Return list of tasks objects."""
         return list(self.tasks.values())
 
@@ -362,17 +364,17 @@ class Workflow(Base):
                 root_relation = TaskRelation(pre_task_code=0, 
post_task_code=task.code)
                 self._task_relations.add(root_relation)
 
-    def add_task(self, task: "Task") -> None:  # noqa: F821
+    def add_task(self, task: Task) -> None:  # noqa: F821
         """Add a single task to workflow."""
         self.tasks[task.code] = task
         task._workflow = self
 
-    def add_tasks(self, tasks: List["Task"]) -> None:  # noqa: F821
+    def add_tasks(self, tasks: list[Task]) -> None:  # noqa: F821
         """Add task sequence to workflow, it a wrapper of :func:`add_task`."""
         for task in tasks:
             self.add_task(task)
 
-    def get_task(self, code: str) -> "Task":  # noqa: F821
+    def get_task(self, code: str) -> Task:  # noqa: F821
         """Get task object from workflow by given code."""
         if code not in self.tasks:
             raise PyDSTaskNoFoundException(
@@ -382,7 +384,7 @@ class Workflow(Base):
         return self.tasks[code]
 
     # TODO which tying should return in this case
-    def get_tasks_by_name(self, name: str) -> Set["Task"]:  # noqa: F821
+    def get_tasks_by_name(self, name: str) -> set[Task]:  # noqa: F821
         """Get tasks object by given name, if will return all tasks with this 
name."""
         find = set()
         for task in self.tasks.values():
@@ -390,7 +392,7 @@ class Workflow(Base):
                 find.add(task)
         return find
 
-    def get_one_task_by_name(self, name: str) -> "Task":  # noqa: F821
+    def get_one_task_by_name(self, name: str) -> Task:  # noqa: F821
         """Get exact one task from workflow by given name.
 
         Function always return one task even though this workflow have more 
than one task with
diff --git a/src/pydolphinscheduler/core/yaml_workflow.py 
b/src/pydolphinscheduler/core/yaml_workflow.py
index df8d65e..a445534 100644
--- a/src/pydolphinscheduler/core/yaml_workflow.py
+++ b/src/pydolphinscheduler/core/yaml_workflow.py
@@ -17,11 +17,13 @@
 
 """Parse YAML file to create workflow."""
 
+from __future__ import annotations
+
 import logging
 import os
 import re
 from pathlib import Path
-from typing import Any, Dict
+from typing import Any
 
 from pydolphinscheduler import configuration, tasks
 from pydolphinscheduler.constants import Symbol
@@ -250,7 +252,7 @@ class YamlWorkflow(YamlParser):
 
         return content
 
-    def parse_task(self, task_data: dict, name2task: Dict[str, Task]):
+    def parse_task(self, task_data: dict, name2task: dict[str, Task]):
         """Parse various types of tasks.
 
         :param task_data: dict.
diff --git a/src/pydolphinscheduler/java_gateway.py 
b/src/pydolphinscheduler/java_gateway.py
index 91243f6..43e3445 100644
--- a/src/pydolphinscheduler/java_gateway.py
+++ b/src/pydolphinscheduler/java_gateway.py
@@ -16,11 +16,12 @@
 # under the License.
 
 """Module java gateway, contain gateway behavior."""
+from __future__ import annotations
 
 import contextlib
 import warnings
 from logging import getLogger
-from typing import Any, Optional
+from typing import Any
 
 from py4j.java_collections import JavaMap
 from py4j.java_gateway import GatewayParameters, JavaGateway
@@ -36,7 +37,7 @@ logger = getLogger(__name__)
 
 def gateway_result_checker(
     result: JavaMap,
-    msg_check: Optional[str] = JavaGatewayDefault.RESULT_MESSAGE_SUCCESS,
+    msg_check: str | None = JavaGatewayDefault.RESULT_MESSAGE_SUCCESS,
 ) -> Any:
     """Check weather java gateway result success or not."""
     if (
@@ -57,10 +58,10 @@ class GatewayEntryPoint:
 
     def __init__(
         self,
-        address: Optional[str] = None,
-        port: Optional[int] = None,
-        auto_convert: Optional[bool] = True,
-        auth_token: Optional[str] = None,
+        address: str | None = None,
+        port: int | None = None,
+        auto_convert: bool | None = True,
+        auth_token: str | None = None,
     ):
         self._gateway = None
         self.address = address or configuration.JAVA_GATEWAY_ADDRESS
@@ -110,7 +111,7 @@ class GatewayEntryPoint:
         """Get the java gateway version, expected to be equal with 
pydolphinscheduler."""
         return self.gateway.entry_point.getGatewayVersion()
 
-    def get_datasource(self, name: str, type: Optional[str] = None):
+    def get_datasource(self, name: str, type: str | None = None):
         """Get single datasource by java gateway.
 
         Will use datasource_name to query database, and then filter by 
datasource_type if provided.
@@ -145,7 +146,7 @@ class GatewayEntryPoint:
         )
 
     def create_or_grant_project(
-        self, user: str, name: str, description: Optional[str] = None
+        self, user: str, name: str, description: str | None = None
     ):
         """Create or grant project through java gateway."""
         return self.gateway.entry_point.createOrGrantProject(user, name, 
description)
@@ -167,7 +168,7 @@ class GatewayEntryPoint:
         return self.gateway.entry_point.deleteProject(user, code)
 
     def create_tenant(
-        self, tenant_name: str, queue_name: str, description: Optional[str] = 
None
+        self, tenant_name: str, queue_name: str, description: str | None = None
     ):
         """Create tenant through java gateway."""
         return self.gateway.entry_point.createTenant(
@@ -188,7 +189,7 @@ class GatewayEntryPoint:
         tenant_id: int,
         code: str,
         queue_id: int,
-        description: Optional[str] = None,
+        description: str | None = None,
     ):
         """Update tenant through java gateway."""
         return self.gateway.entry_point.updateTenant(
@@ -241,7 +242,7 @@ class GatewayEntryPoint:
         self,
         project_name: str,
         workflow_name: str,
-        task_name: Optional[str] = None,
+        task_name: str | None = None,
     ):
         """Get dependent info through java gateway."""
         return self.gateway.entry_point.getDependentInfo(
@@ -269,9 +270,9 @@ class GatewayEntryPoint:
         release_state: int,
         task_relation_json: str,
         task_definition_json: str,
-        schedule: Optional[str] = None,
-        online_schedule: Optional[bool] = None,
-        other_params_json: Optional[str] = None,
+        schedule: str | None = None,
+        online_schedule: bool | None = None,
+        other_params_json: str | None = None,
     ):
         """Create or update workflow through java gateway."""
         return self.gateway.entry_point.createOrUpdateWorkflow(
diff --git a/src/pydolphinscheduler/models/base.py 
b/src/pydolphinscheduler/models/base.py
index 007edec..6b0801a 100644
--- a/src/pydolphinscheduler/models/base.py
+++ b/src/pydolphinscheduler/models/base.py
@@ -17,7 +17,7 @@
 
 """DolphinScheduler Base object."""
 
-from typing import Dict, Optional
+from __future__ import annotations
 
 # from pydolphinscheduler.models.user import User
 from pydolphinscheduler.utils.string import attr2camel
@@ -33,9 +33,9 @@ class Base:
     _DEFINE_ATTR: set = set()
 
     # Object default attribute, will add those attribute to `_DEFINE_ATTR` 
when init assign missing.
-    _DEFAULT_ATTR: Dict = {}
+    _DEFAULT_ATTR: dict = {}
 
-    def __init__(self, name: str, description: Optional[str] = None):
+    def __init__(self, name: str, description: str | None = None):
         self.name = name
         self.description = description
 
@@ -49,7 +49,7 @@ class Base:
 
     def get_define_custom(
         self, camel_attr: bool = True, custom_attr: set = None
-    ) -> Dict:
+    ) -> dict:
         """Get object definition attribute by given attr set."""
         content = {}
         for attr in custom_attr:
@@ -60,7 +60,7 @@ class Base:
                 content[attr] = val
         return content
 
-    def get_define(self, camel_attr: bool = True) -> Dict:
+    def get_define(self, camel_attr: bool = True) -> dict:
         """Get object definition attribute communicate to Java gateway server.
 
         use attribute `self._DEFINE_ATTR` to determine which attributes should 
including when
diff --git a/src/pydolphinscheduler/models/base_side.py 
b/src/pydolphinscheduler/models/base_side.py
index 99b4007..2373ad6 100644
--- a/src/pydolphinscheduler/models/base_side.py
+++ b/src/pydolphinscheduler/models/base_side.py
@@ -17,7 +17,7 @@
 
 """Module for models object."""
 
-from typing import Optional
+from __future__ import annotations
 
 from pydolphinscheduler import configuration
 from pydolphinscheduler.models import Base
@@ -26,7 +26,7 @@ from pydolphinscheduler.models import Base
 class BaseSide(Base):
     """Base class for models object, it declare base behavior for them."""
 
-    def __init__(self, name: str, description: Optional[str] = None):
+    def __init__(self, name: str, description: str | None = None):
         super().__init__(name, description)
 
     @classmethod
diff --git a/src/pydolphinscheduler/models/datasource.py 
b/src/pydolphinscheduler/models/datasource.py
index 5602c30..a7407a2 100644
--- a/src/pydolphinscheduler/models/datasource.py
+++ b/src/pydolphinscheduler/models/datasource.py
@@ -16,9 +16,11 @@
 # under the License.
 
 """Module database."""
+from __future__ import annotations
+
 import json
 import re
-from typing import NamedTuple, Optional
+from dataclasses import dataclass
 
 from py4j.java_gateway import JavaObject
 
@@ -27,7 +29,8 @@ from pydolphinscheduler.models.connection import Connection
 from pydolphinscheduler.models.meta import ModelMeta
 
 
-class TaskUsage(NamedTuple):
+@dataclass
+class TaskUsage:
     """Class for task usage just like datasource in web ui."""
 
     id: int
@@ -92,9 +95,9 @@ class Datasource(metaclass=ModelMeta):
         type_: str,
         name: str,
         connection_params: str,
-        user_id: Optional[int] = None,
-        id_: Optional[int] = None,
-        note: Optional[str] = None,
+        user_id: int | None = None,
+        id_: int | None = None,
+        note: str | None = None,
     ):
         self.id = id_
         self.name = name
@@ -106,8 +109,8 @@ class Datasource(metaclass=ModelMeta):
 
     @classmethod
     def get(
-        cls, datasource_name: str, datasource_type: Optional[str] = None
-    ) -> "Datasource":
+        cls, datasource_name: str, datasource_type: str | None = None
+    ) -> Datasource:
         """Get single datasource.
 
         :param datasource_name: datasource name
@@ -122,10 +125,10 @@ class Datasource(metaclass=ModelMeta):
 
     @classmethod
     def get_task_usage_4j(
-        cls, datasource_name: str, datasource_type: Optional[str] = None
+        cls, datasource_name: str, datasource_type: str | None = None
     ) -> TaskUsage:
         """Get the necessary information of datasource for task usage in web 
UI."""
-        datasource: "Datasource" = cls.get(datasource_name, datasource_type)
+        datasource: Datasource = cls.get(datasource_name, datasource_type)
         return TaskUsage(
             id=datasource.id,
             type=datasource.type.upper(),
diff --git a/src/pydolphinscheduler/models/meta.py 
b/src/pydolphinscheduler/models/meta.py
index 6aea866..f4da69c 100644
--- a/src/pydolphinscheduler/models/meta.py
+++ b/src/pydolphinscheduler/models/meta.py
@@ -22,10 +22,10 @@ This module contains the ModelMeta class, which is used to 
convert ``py4j.java_g
 server to get some resource from database, but you want to make sure the 
return object is a in Python
 object.
 """
+from __future__ import annotations
 
 from functools import wraps
 from inspect import signature
-from typing import Dict, Tuple
 
 from py4j.java_gateway import JavaObject
 
@@ -38,7 +38,7 @@ class ModelMeta(type):
     _FUNC_INIT = "__init__"
     _PARAM_SELF = "self"
 
-    def __new__(mcs, name: str, bases: Tuple, attrs: Dict):
+    def __new__(mcs, name: str, bases: tuple, attrs: dict):
         """Create a new class."""
         if mcs._FUNC_INIT not in attrs:
             raise TypeError(
@@ -59,7 +59,7 @@ class ModelMeta(type):
         return super().__new__(mcs, name, bases, attrs)
 
     @classmethod
-    def j2p(mcs, cm: classmethod, name: str, attrs: Dict, params=None):
+    def j2p(mcs, cm: classmethod, name: str, attrs: dict, params=None):
         """Convert JavaObject to Python object according attribute in the 
``__init__`` method.
 
         ``py4j.java_gateway.JavaObject`` return the Java object from the 
DolphinScheduler server, we can
diff --git a/src/pydolphinscheduler/models/project.py 
b/src/pydolphinscheduler/models/project.py
index 308cb66..6d69692 100644
--- a/src/pydolphinscheduler/models/project.py
+++ b/src/pydolphinscheduler/models/project.py
@@ -17,7 +17,7 @@
 
 """DolphinScheduler Project object."""
 
-from typing import Optional
+from __future__ import annotations
 
 from pydolphinscheduler import configuration
 from pydolphinscheduler.java_gateway import gateway
@@ -30,8 +30,8 @@ class Project(BaseSide):
     def __init__(
         self,
         name: str = configuration.WORKFLOW_PROJECT,
-        description: Optional[str] = None,
-        code: Optional[int] = None,
+        description: str | None = None,
+        code: int | None = None,
     ):
         super().__init__(name, description)
         self.code = code
@@ -43,7 +43,7 @@ class Project(BaseSide):
         # gateway_result_checker(result, None)
 
     @classmethod
-    def get_project_by_name(cls, user=configuration.USER_NAME, name=None) -> 
"Project":
+    def get_project_by_name(cls, user=configuration.USER_NAME, name=None) -> 
Project:
         """Get Project by name."""
         project = gateway.query_project_by_name(user, name)
         if project is None:
diff --git a/src/pydolphinscheduler/models/queue.py 
b/src/pydolphinscheduler/models/queue.py
index e6da259..a6daa21 100644
--- a/src/pydolphinscheduler/models/queue.py
+++ b/src/pydolphinscheduler/models/queue.py
@@ -17,7 +17,7 @@
 
 """DolphinScheduler User object."""
 
-from typing import Optional
+from __future__ import annotations
 
 from pydolphinscheduler import configuration
 from pydolphinscheduler.models import BaseSide
@@ -29,6 +29,6 @@ class Queue(BaseSide):
     def __init__(
         self,
         name: str = configuration.WORKFLOW_QUEUE,
-        description: Optional[str] = "",
+        description: str | None = "",
     ):
         super().__init__(name, description)
diff --git a/src/pydolphinscheduler/models/tenant.py 
b/src/pydolphinscheduler/models/tenant.py
index 10882c2..5503a89 100644
--- a/src/pydolphinscheduler/models/tenant.py
+++ b/src/pydolphinscheduler/models/tenant.py
@@ -17,7 +17,7 @@
 
 """DolphinScheduler Tenant object."""
 
-from typing import Optional
+from __future__ import annotations
 
 from pydolphinscheduler import configuration
 from pydolphinscheduler.java_gateway import gateway
@@ -31,10 +31,10 @@ class Tenant(BaseSide):
         self,
         name: str = configuration.USER_TENANT,
         queue: str = configuration.WORKFLOW_QUEUE,
-        description: Optional[str] = None,
-        tenant_id: Optional[int] = None,
-        code: Optional[str] = None,
-        user_name: Optional[str] = None,
+        description: str | None = None,
+        tenant_id: int | None = None,
+        code: str | None = None,
+        user_name: str | None = None,
     ):
         super().__init__(name, description)
         self.tenant_id = tenant_id
@@ -52,7 +52,7 @@ class Tenant(BaseSide):
         # gateway_result_checker(result, None)
 
     @classmethod
-    def get_tenant(cls, code: str) -> "Tenant":
+    def get_tenant(cls, code: str) -> Tenant:
         """Get Tenant list."""
         tenant = gateway.query_tenant(code)
         if tenant is None:
diff --git a/src/pydolphinscheduler/models/user.py 
b/src/pydolphinscheduler/models/user.py
index e58af46..26b56c6 100644
--- a/src/pydolphinscheduler/models/user.py
+++ b/src/pydolphinscheduler/models/user.py
@@ -17,7 +17,7 @@
 
 """DolphinScheduler User object."""
 
-from typing import Optional
+from __future__ import annotations
 
 from pydolphinscheduler import configuration
 from pydolphinscheduler.java_gateway import gateway
@@ -40,15 +40,15 @@ class User(BaseSide):
     def __init__(
         self,
         name: str,
-        password: Optional[str] = configuration.USER_PASSWORD,
-        email: Optional[str] = configuration.USER_EMAIL,
-        phone: Optional[str] = configuration.USER_PHONE,
-        tenant: Optional[str] = configuration.USER_TENANT,
-        queue: Optional[str] = configuration.WORKFLOW_QUEUE,
-        status: Optional[int] = configuration.USER_STATE,
+        password: str | None = configuration.USER_PASSWORD,
+        email: str | None = configuration.USER_EMAIL,
+        phone: str | None = configuration.USER_PHONE,
+        tenant: str | None = configuration.USER_TENANT,
+        queue: str | None = configuration.WORKFLOW_QUEUE,
+        status: int | None = configuration.USER_STATE,
     ):
         super().__init__(name)
-        self.user_id: Optional[int] = None
+        self.user_id: int | None = None
         self.password = password
         self.email = email
         self.phone = phone
@@ -79,7 +79,7 @@ class User(BaseSide):
         # gateway_result_checker(result, None)
 
     @classmethod
-    def get_user(cls, user_id) -> "User":
+    def get_user(cls, user_id) -> User:
         """Get User."""
         user = gateway.query_user(user_id)
         if user is None:
diff --git a/src/pydolphinscheduler/models/worker_group.py 
b/src/pydolphinscheduler/models/worker_group.py
index bc55eaf..188f1f4 100644
--- a/src/pydolphinscheduler/models/worker_group.py
+++ b/src/pydolphinscheduler/models/worker_group.py
@@ -17,7 +17,7 @@
 
 """DolphinScheduler Worker Group object."""
 
-from typing import Optional
+from __future__ import annotations
 
 from pydolphinscheduler.models import BaseSide
 
@@ -25,6 +25,6 @@ from pydolphinscheduler.models import BaseSide
 class WorkerGroup(BaseSide):
     """DolphinScheduler Worker Group object."""
 
-    def __init__(self, name: str, address: str, description: Optional[str] = 
None):
+    def __init__(self, name: str, address: str, description: str | None = 
None):
         super().__init__(name, description)
         self.address = address
diff --git a/src/pydolphinscheduler/resources_plugin/base/bucket.py 
b/src/pydolphinscheduler/resources_plugin/base/bucket.py
index a9d48a3..64be694 100644
--- a/src/pydolphinscheduler/resources_plugin/base/bucket.py
+++ b/src/pydolphinscheduler/resources_plugin/base/bucket.py
@@ -16,6 +16,9 @@
 # under the License.
 
 """DolphinScheduler BucketFileInfo and Bucket object."""
+
+from __future__ import annotations
+
 from abc import ABCMeta, abstractmethod
 from typing import Optional
 
@@ -28,11 +31,7 @@ class BucketFileInfo:
     """
 
     def __init__(
-        self,
-        bucket: Optional[str] = None,
-        file_path: Optional[str] = None,
-        *args,
-        **kwargs
+        self, bucket: str | None = None, file_path: str | None = None, *args, 
**kwargs
     ):
         self.bucket = bucket
         self.file_path = file_path
@@ -48,11 +47,11 @@ class OSSFileInfo(BucketFileInfo):
 
     def __init__(
         self,
-        endpoint: Optional[str] = None,
-        bucket: Optional[str] = None,
-        file_path: Optional[str] = None,
+        endpoint: str | None = None,
+        bucket: str | None = None,
+        file_path: str | None = None,
         *args,
-        **kwargs
+        **kwargs,
     ):
         super().__init__(bucket=bucket, file_path=file_path, *args, **kwargs)
         self.endpoint = endpoint
@@ -66,11 +65,7 @@ class S3FileInfo(BucketFileInfo):
     """
 
     def __init__(
-        self,
-        bucket: Optional[str] = None,
-        file_path: Optional[str] = None,
-        *args,
-        **kwargs
+        self, bucket: str | None = None, file_path: str | None = None, *args, 
**kwargs
     ):
         super().__init__(bucket=bucket, file_path=file_path, *args, **kwargs)
 
diff --git a/src/pydolphinscheduler/resources_plugin/base/git.py 
b/src/pydolphinscheduler/resources_plugin/base/git.py
index 53b5509..1cdeb5b 100644
--- a/src/pydolphinscheduler/resources_plugin/base/git.py
+++ b/src/pydolphinscheduler/resources_plugin/base/git.py
@@ -17,6 +17,8 @@
 
 """DolphinScheduler GitFileInfo and Git object."""
 
+from __future__ import annotations
+
 from abc import ABCMeta, abstractmethod
 from typing import Optional
 
@@ -32,12 +34,12 @@ class GitFileInfo:
 
     def __init__(
         self,
-        user: Optional[str] = None,
-        repo_name: Optional[str] = None,
-        branch: Optional[str] = None,
-        file_path: Optional[str] = None,
+        user: str | None = None,
+        repo_name: str | None = None,
+        branch: str | None = None,
+        file_path: str | None = None,
         *args,
-        **kwargs
+        **kwargs,
     ):
         self.user = user
         self.repo_name = repo_name
@@ -56,12 +58,12 @@ class GitHubFileInfo(GitFileInfo):
 
     def __init__(
         self,
-        user: Optional[str] = None,
-        repo_name: Optional[str] = None,
-        branch: Optional[str] = None,
-        file_path: Optional[str] = None,
+        user: str | None = None,
+        repo_name: str | None = None,
+        branch: str | None = None,
+        file_path: str | None = None,
         *args,
-        **kwargs
+        **kwargs,
     ):
         super().__init__(
             user=user,
@@ -69,7 +71,7 @@ class GitHubFileInfo(GitFileInfo):
             branch=branch,
             file_path=file_path,
             *args,
-            **kwargs
+            **kwargs,
         )
 
 
@@ -85,13 +87,13 @@ class GitLabFileInfo(GitFileInfo):
 
     def __init__(
         self,
-        host: Optional[str] = None,
-        user: Optional[str] = None,
-        repo_name: Optional[str] = None,
-        branch: Optional[str] = None,
-        file_path: Optional[str] = None,
+        host: str | None = None,
+        user: str | None = None,
+        repo_name: str | None = None,
+        branch: str | None = None,
+        file_path: str | None = None,
         *args,
-        **kwargs
+        **kwargs,
     ):
         super().__init__(
             user=user,
@@ -99,7 +101,7 @@ class GitLabFileInfo(GitFileInfo):
             branch=branch,
             file_path=file_path,
             *args,
-            **kwargs
+            **kwargs,
         )
         self.host = host
 
diff --git a/src/pydolphinscheduler/resources_plugin/github.py 
b/src/pydolphinscheduler/resources_plugin/github.py
index 4564864..369d377 100644
--- a/src/pydolphinscheduler/resources_plugin/github.py
+++ b/src/pydolphinscheduler/resources_plugin/github.py
@@ -16,8 +16,9 @@
 # under the License.
 
 """DolphinScheduler github resource plugin."""
+from __future__ import annotations
+
 import base64
-from typing import Optional
 from urllib.parse import urljoin
 
 import requests
@@ -34,13 +35,11 @@ class GitHub(ResourcePlugin, Git):
     :param access_token: A string used for identity authentication of GitHub 
private repository.
     """
 
-    def __init__(
-        self, prefix: str, access_token: Optional[str] = None, *args, **kwargs
-    ):
+    def __init__(self, prefix: str, access_token: str | None = None, *args, 
**kwargs):
         super().__init__(prefix, *args, **kwargs)
         self.access_token = access_token
 
-    _git_file_info: Optional[GitHubFileInfo] = None
+    _git_file_info: GitHubFileInfo | None = None
 
     def build_req_api(
         self,
diff --git a/src/pydolphinscheduler/resources_plugin/gitlab.py 
b/src/pydolphinscheduler/resources_plugin/gitlab.py
index fb01117..323b399 100644
--- a/src/pydolphinscheduler/resources_plugin/gitlab.py
+++ b/src/pydolphinscheduler/resources_plugin/gitlab.py
@@ -16,7 +16,8 @@
 # under the License.
 
 """DolphinScheduler gitlab resource plugin."""
-from typing import Optional
+from __future__ import annotations
+
 from urllib.parse import urljoin, urlparse
 
 import gitlab
@@ -40,10 +41,10 @@ class GitLab(ResourcePlugin, Git):
     def __init__(
         self,
         prefix: str,
-        private_token: Optional[str] = None,
-        oauth_token: Optional[str] = None,
-        username: Optional[str] = None,
-        password: Optional[str] = None,
+        private_token: str | None = None,
+        oauth_token: str | None = None,
+        username: str | None = None,
+        password: str | None = None,
         *args,
         **kwargs,
     ):
diff --git a/src/pydolphinscheduler/resources_plugin/oss.py 
b/src/pydolphinscheduler/resources_plugin/oss.py
index 1a9acbb..871b8db 100644
--- a/src/pydolphinscheduler/resources_plugin/oss.py
+++ b/src/pydolphinscheduler/resources_plugin/oss.py
@@ -16,7 +16,8 @@
 # under the License.
 
 """DolphinScheduler oss resource plugin."""
-from typing import Optional
+from __future__ import annotations
+
 from urllib.parse import urljoin, urlparse
 
 import oss2
@@ -37,8 +38,8 @@ class OSS(ResourcePlugin, Bucket):
     def __init__(
         self,
         prefix: str,
-        access_key_id: Optional[str] = None,
-        access_key_secret: Optional[str] = None,
+        access_key_id: str | None = None,
+        access_key_secret: str | None = None,
         *args,
         **kwargs,
     ):
@@ -46,7 +47,7 @@ class OSS(ResourcePlugin, Bucket):
         self.access_key_id = access_key_id
         self.access_key_secret = access_key_secret
 
-    _bucket_file_info: Optional[OSSFileInfo] = None
+    _bucket_file_info: OSSFileInfo | None = None
 
     def get_bucket_file_info(self, path: str):
         """Get file information from the file url, like repository name, user, 
branch, and file path."""
diff --git a/src/pydolphinscheduler/resources_plugin/s3.py 
b/src/pydolphinscheduler/resources_plugin/s3.py
index da1fe83..5ff16e2 100644
--- a/src/pydolphinscheduler/resources_plugin/s3.py
+++ b/src/pydolphinscheduler/resources_plugin/s3.py
@@ -17,7 +17,8 @@
 
 """DolphinScheduler S3 resource plugin."""
 
-from typing import Optional
+from __future__ import annotations
+
 from urllib.parse import urljoin
 
 import boto3
@@ -38,16 +39,16 @@ class S3(ResourcePlugin, Bucket):
     def __init__(
         self,
         prefix: str,
-        access_key_id: Optional[str] = None,
-        access_key_secret: Optional[str] = None,
+        access_key_id: str | None = None,
+        access_key_secret: str | None = None,
         *args,
-        **kwargs
+        **kwargs,
     ):
         super().__init__(prefix, *args, **kwargs)
         self.access_key_id = access_key_id
         self.access_key_secret = access_key_secret
 
-    _bucket_file_info: Optional[S3FileInfo] = None
+    _bucket_file_info: S3FileInfo | None = None
 
     def get_bucket_file_info(self, path: str):
         """Get file information from the file url, like repository name, user, 
branch, and file path."""
diff --git a/src/pydolphinscheduler/tasks/condition.py 
b/src/pydolphinscheduler/tasks/condition.py
index cb139f1..3d8f6f8 100644
--- a/src/pydolphinscheduler/tasks/condition.py
+++ b/src/pydolphinscheduler/tasks/condition.py
@@ -17,7 +17,7 @@
 
 """Task Conditions."""
 
-from typing import Dict, List
+from __future__ import annotations
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
@@ -44,7 +44,7 @@ class Status(Base):
         """Get name for Status or its sub class."""
         return cls.__name__.upper()
 
-    def get_define(self, camel_attr: bool = True) -> List:
+    def get_define(self, camel_attr: bool = True) -> list:
         """Get status definition attribute communicate to Java gateway 
server."""
         content = []
         for task in self.tasks:
@@ -123,7 +123,7 @@ class ConditionOperator(Base):
         setattr(self, attr, result)
         return attr
 
-    def get_define(self, camel_attr=True) -> Dict:
+    def get_define(self, camel_attr=True) -> dict:
         """Overwrite Base.get_define to get task Condition specific get 
define."""
         attr = self.set_define_attr()
         dependent_define_attr = self._DEFINE_ATTR.union({attr})
@@ -184,7 +184,7 @@ class Condition(Task):
         self.set_downstream([self.success_task, self.failed_task])
 
     @property
-    def condition_result(self) -> Dict:
+    def condition_result(self) -> dict:
         """Get condition result define for java gateway."""
         return {
             "successNode": [self.success_task.code],
@@ -192,7 +192,7 @@ class Condition(Task):
         }
 
     @property
-    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
Dict:
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
dict:
         """Override Task.task_params for Condition task.
 
         Condition task have some specials attribute `dependence`, and in most 
of the task
diff --git a/src/pydolphinscheduler/tasks/datax.py 
b/src/pydolphinscheduler/tasks/datax.py
index 47b8b1f..148b4b2 100644
--- a/src/pydolphinscheduler/tasks/datax.py
+++ b/src/pydolphinscheduler/tasks/datax.py
@@ -17,7 +17,7 @@
 
 """Task datax."""
 
-from typing import Dict, List, Optional
+from __future__ import annotations
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.mixin import WorkerResourceMixin
@@ -42,10 +42,10 @@ class CustomDataX(WorkerResourceMixin, Task):
         self,
         name: str,
         json: str,
-        xms: Optional[int] = 1,
-        xmx: Optional[int] = 1,
+        xms: int | None = 1,
+        xmx: int | None = 1,
         *args,
-        **kwargs
+        **kwargs,
     ):
         self._json = json
         super().__init__(name, TaskType.DATAX, *args, **kwargs)
@@ -100,16 +100,16 @@ class DataX(WorkerResourceMixin, Task):
         datatarget_name: str,
         sql: str,
         target_table: str,
-        datasource_type: Optional[str] = None,
-        datatarget_type: Optional[str] = None,
-        job_speed_byte: Optional[int] = 0,
-        job_speed_record: Optional[int] = 1000,
-        pre_statements: Optional[List[str]] = None,
-        post_statements: Optional[List[str]] = None,
-        xms: Optional[int] = 1,
-        xmx: Optional[int] = 1,
+        datasource_type: str | None = None,
+        datatarget_type: str | None = None,
+        job_speed_byte: int | None = 0,
+        job_speed_record: int | None = 1000,
+        pre_statements: list[str] | None = None,
+        post_statements: list[str] | None = None,
+        xms: int | None = 1,
+        xmx: int | None = 1,
         *args,
-        **kwargs
+        **kwargs,
     ):
         self._sql = sql
         super().__init__(name, TaskType.DATAX, *args, **kwargs)
@@ -128,7 +128,7 @@ class DataX(WorkerResourceMixin, Task):
         self.add_attr(**kwargs)
 
     @property
-    def source_params(self) -> Dict:
+    def source_params(self) -> dict:
         """Get source params for datax task."""
         datasource_task_u = Datasource.get_task_usage_4j(
             self.datasource_name, self.datasource_type
@@ -139,7 +139,7 @@ class DataX(WorkerResourceMixin, Task):
         }
 
     @property
-    def target_params(self) -> Dict:
+    def target_params(self) -> dict:
         """Get target params for datax task."""
         datasource_task_u = Datasource.get_task_usage_4j(
             self.datatarget_name, self.datatarget_type
@@ -150,7 +150,7 @@ class DataX(WorkerResourceMixin, Task):
         }
 
     @property
-    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
Dict:
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
dict:
         """Override Task.task_params for datax task.
 
         datax task have some specials attribute for task_params, and is odd if 
we
diff --git a/src/pydolphinscheduler/tasks/dependent.py 
b/src/pydolphinscheduler/tasks/dependent.py
index 98a76ef..6c5ac2b 100644
--- a/src/pydolphinscheduler/tasks/dependent.py
+++ b/src/pydolphinscheduler/tasks/dependent.py
@@ -16,9 +16,9 @@
 # under the License.
 
 """Task dependent."""
+from __future__ import annotations
 
 import warnings
-from typing import Dict, Optional, Tuple
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
@@ -91,9 +91,9 @@ class DependentItem(Base):
         self,
         project_name: str,
         # TODO zhongjiajie should be also depeloped in 4.1.0
-        workflow_name: Optional[str] = None,
-        dependent_task_name: Optional[str] = DEPENDENT_ALL_TASK_IN_WORKFLOW,
-        dependent_date: Optional[DependentDate] = DependentDate.TODAY,
+        workflow_name: str | None = None,
+        dependent_task_name: str | None = DEPENDENT_ALL_TASK_IN_WORKFLOW,
+        dependent_date: DependentDate | None = DependentDate.TODAY,
         *args,
         **kwargs,
     ):
@@ -170,7 +170,7 @@ class DependentItem(Base):
         return self.dependent_task_name == DEPENDENT_ALL_TASK_IN_WORKFLOW
 
     @property
-    def code_parameter(self) -> Tuple:
+    def code_parameter(self) -> tuple:
         """Get name info parameter to query code."""
         param = (
             self.project_name,
@@ -179,7 +179,7 @@ class DependentItem(Base):
         )
         return param
 
-    def get_code_from_gateway(self) -> Dict:
+    def get_code_from_gateway(self) -> dict:
         """Get project, definition, task code from given parameter."""
         if self._code:
             return self._code
@@ -241,7 +241,7 @@ class DependentOperator(Base):
         setattr(self, attr, result)
         return attr
 
-    def get_define(self, camel_attr=True) -> Dict:
+    def get_define(self, camel_attr=True) -> dict:
         """Overwrite Base.get_define to get task dependent specific get 
define."""
         attr = self.set_define_attr()
         dependent_define_attr = self._DEFINE_ATTR.union({attr})
@@ -280,7 +280,7 @@ class Dependent(Task):
         self.dependence = dependence
 
     @property
-    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
Dict:
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
dict:
         """Override Task.task_params for dependent task.
 
         Dependent task have some specials attribute `dependence`, and in most 
of the task
diff --git a/src/pydolphinscheduler/tasks/dvc.py 
b/src/pydolphinscheduler/tasks/dvc.py
index c5b5cd5..5cbc6f0 100644
--- a/src/pydolphinscheduler/tasks/dvc.py
+++ b/src/pydolphinscheduler/tasks/dvc.py
@@ -16,8 +16,9 @@
 # under the License.
 
 """Task dvc."""
+from __future__ import annotations
+
 from copy import deepcopy
-from typing import Dict
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
@@ -48,7 +49,7 @@ class BaseDVC(Task):
         self.dvc_repository = repository
 
     @property
-    def task_params(self) -> Dict:
+    def task_params(self) -> dict:
         """Return task params."""
         self._task_custom_attr = deepcopy(self._task_custom_attr)
         self._task_custom_attr.update(self._child_task_dvc_attr)
@@ -86,7 +87,7 @@ class DVCDownload(BaseDVC):
         data_path_in_worker: str,
         version: str,
         *args,
-        **kwargs
+        **kwargs,
     ):
         super().__init__(name, repository, *args, **kwargs)
         self.dvc_data_location = data_path_in_dvc_repository
@@ -115,7 +116,7 @@ class DVCUpload(BaseDVC):
         version: str,
         message: str,
         *args,
-        **kwargs
+        **kwargs,
     ):
         super().__init__(name, repository, *args, **kwargs)
         self.dvc_data_location = data_path_in_dvc_repository
diff --git a/src/pydolphinscheduler/tasks/flink.py 
b/src/pydolphinscheduler/tasks/flink.py
index 83cae95..fa6a15d 100644
--- a/src/pydolphinscheduler/tasks/flink.py
+++ b/src/pydolphinscheduler/tasks/flink.py
@@ -17,7 +17,7 @@
 
 """Task Flink."""
 
-from typing import Optional
+from __future__ import annotations
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.engine import Engine, ProgramType
@@ -58,19 +58,19 @@ class Flink(Engine):
         name: str,
         main_class: str,
         main_package: str,
-        program_type: Optional[ProgramType] = ProgramType.SCALA,
-        deploy_mode: Optional[DeployMode] = DeployMode.CLUSTER,
-        flink_version: Optional[FlinkVersion] = FlinkVersion.LOW_VERSION,
-        app_name: Optional[str] = None,
-        job_manager_memory: Optional[str] = "1G",
-        task_manager_memory: Optional[str] = "2G",
-        slot: Optional[int] = 1,
-        task_manager: Optional[int] = 2,
-        parallelism: Optional[int] = 1,
-        main_args: Optional[str] = None,
-        others: Optional[str] = None,
+        program_type: ProgramType | None = ProgramType.SCALA,
+        deploy_mode: DeployMode | None = DeployMode.CLUSTER,
+        flink_version: FlinkVersion | None = FlinkVersion.LOW_VERSION,
+        app_name: str | None = None,
+        job_manager_memory: str | None = "1G",
+        task_manager_memory: str | None = "2G",
+        slot: int | None = 1,
+        task_manager: int | None = 2,
+        parallelism: int | None = 1,
+        main_args: str | None = None,
+        others: str | None = None,
         *args,
-        **kwargs
+        **kwargs,
     ):
         super().__init__(
             name,
@@ -79,7 +79,7 @@ class Flink(Engine):
             main_package,
             program_type,
             *args,
-            **kwargs
+            **kwargs,
         )
         self.deploy_mode = deploy_mode
         self.flink_version = flink_version
diff --git a/src/pydolphinscheduler/tasks/http.py 
b/src/pydolphinscheduler/tasks/http.py
index 781333d..7849317 100644
--- a/src/pydolphinscheduler/tasks/http.py
+++ b/src/pydolphinscheduler/tasks/http.py
@@ -17,7 +17,7 @@
 
 """Task shell."""
 
-from typing import Optional
+from __future__ import annotations
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
@@ -67,14 +67,14 @@ class Http(Task):
         self,
         name: str,
         url: str,
-        http_method: Optional[str] = HttpMethod.GET,
-        http_params: Optional[str] = None,
-        http_check_condition: Optional[str] = 
HttpCheckCondition.STATUS_CODE_DEFAULT,
-        condition: Optional[str] = None,
-        connect_timeout: Optional[int] = 60000,
-        socket_timeout: Optional[int] = 60000,
+        http_method: str | None = HttpMethod.GET,
+        http_params: str | None = None,
+        http_check_condition: str | None = 
HttpCheckCondition.STATUS_CODE_DEFAULT,
+        condition: str | None = None,
+        connect_timeout: int | None = 60000,
+        socket_timeout: int | None = 60000,
         *args,
-        **kwargs
+        **kwargs,
     ):
         super().__init__(name, TaskType.HTTP, *args, **kwargs)
         self.url = url
diff --git a/src/pydolphinscheduler/tasks/map_reduce.py 
b/src/pydolphinscheduler/tasks/map_reduce.py
index 5050bd3..146720e 100644
--- a/src/pydolphinscheduler/tasks/map_reduce.py
+++ b/src/pydolphinscheduler/tasks/map_reduce.py
@@ -17,7 +17,7 @@
 
 """Task MR."""
 
-from typing import Optional
+from __future__ import annotations
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.engine import Engine, ProgramType
@@ -37,12 +37,12 @@ class MR(Engine):
         name: str,
         main_class: str,
         main_package: str,
-        program_type: Optional[ProgramType] = ProgramType.SCALA,
-        app_name: Optional[str] = None,
-        main_args: Optional[str] = None,
-        others: Optional[str] = None,
+        program_type: ProgramType | None = ProgramType.SCALA,
+        app_name: str | None = None,
+        main_args: str | None = None,
+        others: str | None = None,
         *args,
-        **kwargs
+        **kwargs,
     ):
         super().__init__(
             name, TaskType.MR, main_class, main_package, program_type, *args, 
**kwargs
diff --git a/src/pydolphinscheduler/tasks/mlflow.py 
b/src/pydolphinscheduler/tasks/mlflow.py
index e86797a..f61fc86 100644
--- a/src/pydolphinscheduler/tasks/mlflow.py
+++ b/src/pydolphinscheduler/tasks/mlflow.py
@@ -16,8 +16,9 @@
 # under the License.
 
 """Task mlflow."""
+from __future__ import annotations
+
 from copy import deepcopy
-from typing import Dict, Optional
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
@@ -66,7 +67,7 @@ class BaseMLflow(Task):
         self.mlflow_tracking_uri = mlflow_tracking_uri
 
     @property
-    def task_params(self) -> Dict:
+    def task_params(self) -> dict:
         """Return task params."""
         self._task_custom_attr = deepcopy(self._task_custom_attr)
         self._task_custom_attr.update(self._child_task_mlflow_attr)
@@ -98,11 +99,11 @@ class MLflowModels(BaseMLflow):
         self,
         name: str,
         model_uri: str,
-        mlflow_tracking_uri: Optional[str] = DEFAULT_MLFLOW_TRACKING_URI,
-        deploy_mode: Optional[str] = MLflowDeployType.DOCKER,
-        port: Optional[int] = 7000,
+        mlflow_tracking_uri: str | None = DEFAULT_MLFLOW_TRACKING_URI,
+        deploy_mode: str | None = MLflowDeployType.DOCKER,
+        port: int | None = 7000,
         *args,
-        **kwargs
+        **kwargs,
     ):
         """Init mlflow models task."""
         super().__init__(name, mlflow_tracking_uri, *args, **kwargs)
@@ -140,12 +141,12 @@ class MLFlowProjectsCustom(BaseMLflow):
         self,
         name: str,
         repository: str,
-        mlflow_tracking_uri: Optional[str] = DEFAULT_MLFLOW_TRACKING_URI,
-        experiment_name: Optional[str] = "",
-        parameters: Optional[str] = "",
-        version: Optional[str] = "master",
+        mlflow_tracking_uri: str | None = DEFAULT_MLFLOW_TRACKING_URI,
+        experiment_name: str | None = "",
+        parameters: str | None = "",
+        version: str | None = "master",
         *args,
-        **kwargs
+        **kwargs,
     ):
         """Init mlflow projects task."""
         super().__init__(name, mlflow_tracking_uri, *args, **kwargs)
@@ -185,13 +186,13 @@ class MLFlowProjectsAutoML(BaseMLflow):
         self,
         name: str,
         data_path: str,
-        automl_tool: Optional[str] = "flaml",
-        mlflow_tracking_uri: Optional[str] = DEFAULT_MLFLOW_TRACKING_URI,
-        experiment_name: Optional[str] = "",
-        model_name: Optional[str] = "",
-        parameters: Optional[str] = "",
+        automl_tool: str | None = "flaml",
+        mlflow_tracking_uri: str | None = DEFAULT_MLFLOW_TRACKING_URI,
+        experiment_name: str | None = "",
+        model_name: str | None = "",
+        parameters: str | None = "",
         *args,
-        **kwargs
+        **kwargs,
     ):
         """Init mlflow projects task."""
         super().__init__(name, mlflow_tracking_uri, *args, **kwargs)
@@ -236,14 +237,14 @@ class MLFlowProjectsBasicAlgorithm(BaseMLflow):
         self,
         name: str,
         data_path: str,
-        algorithm: Optional[str] = "lightgbm",
-        mlflow_tracking_uri: Optional[str] = DEFAULT_MLFLOW_TRACKING_URI,
-        experiment_name: Optional[str] = "",
-        model_name: Optional[str] = "",
-        parameters: Optional[str] = "",
-        search_params: Optional[str] = "",
+        algorithm: str | None = "lightgbm",
+        mlflow_tracking_uri: str | None = DEFAULT_MLFLOW_TRACKING_URI,
+        experiment_name: str | None = "",
+        model_name: str | None = "",
+        parameters: str | None = "",
+        search_params: str | None = "",
         *args,
-        **kwargs
+        **kwargs,
     ):
         """Init mlflow projects task."""
         super().__init__(name, mlflow_tracking_uri, *args, **kwargs)
diff --git a/src/pydolphinscheduler/tasks/procedure.py 
b/src/pydolphinscheduler/tasks/procedure.py
index 5a9d87b..807196a 100644
--- a/src/pydolphinscheduler/tasks/procedure.py
+++ b/src/pydolphinscheduler/tasks/procedure.py
@@ -17,7 +17,7 @@
 
 """Task procedure."""
 
-from typing import Dict, Optional
+from __future__ import annotations
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
@@ -47,9 +47,9 @@ class Procedure(Task):
         name: str,
         datasource_name: str,
         method: str,
-        datasource_type: Optional[str] = None,
+        datasource_type: str | None = None,
         *args,
-        **kwargs
+        **kwargs,
     ):
         super().__init__(name, TaskType.PROCEDURE, *args, **kwargs)
         self.datasource_name = datasource_name
@@ -57,7 +57,7 @@ class Procedure(Task):
         self.method = method
 
     @property
-    def datasource(self) -> Dict:
+    def datasource(self) -> dict:
         """Get datasource for procedure task."""
         datasource_task_u = Datasource.get_task_usage_4j(
             self.datasource_name, self.datasource_type
@@ -68,7 +68,7 @@ class Procedure(Task):
         }
 
     @property
-    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
Dict:
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
dict:
         """Override Task.task_params for produce task.
 
         produce task have some specials attribute for task_params, and is odd 
if we
diff --git a/src/pydolphinscheduler/tasks/python.py 
b/src/pydolphinscheduler/tasks/python.py
index 93cec00..8e2729f 100644
--- a/src/pydolphinscheduler/tasks/python.py
+++ b/src/pydolphinscheduler/tasks/python.py
@@ -17,11 +17,12 @@
 
 """Task Python."""
 
+from __future__ import annotations
+
 import logging
 import re
 import types
 from pathlib import Path
-from typing import Union
 
 from stmdency.extractor import Extractor
 
@@ -61,10 +62,10 @@ class Python(WorkerResourceMixin, Task):
     _task_custom_attr = {"raw_script"}
 
     ext: set = {".py"}
-    ext_attr: Union[str, types.FunctionType] = "_definition"
+    ext_attr: str | types.FunctionType = "_definition"
 
     def __init__(
-        self, name: str, definition: Union[str, types.FunctionType], *args, 
**kwargs
+        self, name: str, definition: str | types.FunctionType, *args, **kwargs
     ):
         self._definition = definition
         super().__init__(name, TaskType.PYTHON, *args, **kwargs)
diff --git a/src/pydolphinscheduler/tasks/pytorch.py 
b/src/pydolphinscheduler/tasks/pytorch.py
index 8ae371f..0ba0d7d 100644
--- a/src/pydolphinscheduler/tasks/pytorch.py
+++ b/src/pydolphinscheduler/tasks/pytorch.py
@@ -16,7 +16,7 @@
 # under the License.
 
 """Task Pytorch."""
-from typing import Optional
+from __future__ import annotations
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.mixin import WorkerResourceMixin
@@ -65,12 +65,12 @@ class Pytorch(WorkerResourceMixin, Task):
         name: str,
         script: str,
         script_params: str = "",
-        project_path: Optional[str] = DEFAULT.project_path,
-        is_create_environment: Optional[bool] = DEFAULT.is_create_environment,
-        python_command: Optional[str] = DEFAULT.python_command,
-        python_env_tool: Optional[str] = "conda",
-        requirements: Optional[str] = "requirements.txt",
-        conda_python_version: Optional[str] = "3.7",
+        project_path: str | None = DEFAULT.project_path,
+        is_create_environment: bool | None = DEFAULT.is_create_environment,
+        python_command: str | None = DEFAULT.python_command,
+        python_env_tool: str | None = "conda",
+        requirements: str | None = "requirements.txt",
+        conda_python_version: str | None = "3.7",
         *args,
         **kwargs,
     ):
diff --git a/src/pydolphinscheduler/tasks/spark.py 
b/src/pydolphinscheduler/tasks/spark.py
index eb9c621..93adb6b 100644
--- a/src/pydolphinscheduler/tasks/spark.py
+++ b/src/pydolphinscheduler/tasks/spark.py
@@ -17,7 +17,7 @@
 
 """Task Spark."""
 
-from typing import Optional
+from __future__ import annotations
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.engine import Engine, ProgramType
@@ -51,18 +51,18 @@ class Spark(Engine):
         name: str,
         main_class: str,
         main_package: str,
-        program_type: Optional[ProgramType] = ProgramType.SCALA,
-        deploy_mode: Optional[DeployMode] = DeployMode.CLUSTER,
-        app_name: Optional[str] = None,
-        driver_cores: Optional[int] = 1,
-        driver_memory: Optional[str] = "512M",
-        num_executors: Optional[int] = 2,
-        executor_memory: Optional[str] = "2G",
-        executor_cores: Optional[int] = 2,
-        main_args: Optional[str] = None,
-        others: Optional[str] = None,
+        program_type: ProgramType | None = ProgramType.SCALA,
+        deploy_mode: DeployMode | None = DeployMode.CLUSTER,
+        app_name: str | None = None,
+        driver_cores: int | None = 1,
+        driver_memory: str | None = "512M",
+        num_executors: int | None = 2,
+        executor_memory: str | None = "2G",
+        executor_cores: int | None = 2,
+        main_args: str | None = None,
+        others: str | None = None,
         *args,
-        **kwargs
+        **kwargs,
     ):
         super().__init__(
             name,
@@ -71,7 +71,7 @@ class Spark(Engine):
             main_package,
             program_type,
             *args,
-            **kwargs
+            **kwargs,
         )
         self.deploy_mode = deploy_mode
         self.app_name = app_name
diff --git a/src/pydolphinscheduler/tasks/sql.py 
b/src/pydolphinscheduler/tasks/sql.py
index a320d43..520c291 100644
--- a/src/pydolphinscheduler/tasks/sql.py
+++ b/src/pydolphinscheduler/tasks/sql.py
@@ -16,10 +16,11 @@
 # under the License.
 
 """Task sql."""
+from __future__ import annotations
 
 import logging
 import re
-from typing import Dict, List, Optional, Sequence, Union
+from collections.abc import Sequence
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
@@ -79,13 +80,13 @@ class Sql(Task):
         name: str,
         datasource_name: str,
         sql: str,
-        datasource_type: Optional[str] = None,
-        sql_type: Optional[str] = None,
-        pre_statements: Union[str, Sequence[str], None] = None,
-        post_statements: Union[str, Sequence[str], None] = None,
-        display_rows: Optional[int] = 10,
+        datasource_type: str | None = None,
+        sql_type: str | None = None,
+        pre_statements: str | Sequence[str] | None = None,
+        post_statements: str | Sequence[str] | None = None,
+        display_rows: int | None = 10,
         *args,
-        **kwargs
+        **kwargs,
     ):
         self._sql = sql
         super().__init__(name, TaskType.SQL, *args, **kwargs)
@@ -97,7 +98,7 @@ class Sql(Task):
         self.display_rows = display_rows
 
     @staticmethod
-    def get_stm_list(stm: Union[str, Sequence[str]]) -> List[str]:
+    def get_stm_list(stm: str | Sequence[str]) -> list[str]:
         """Convert statement to str of list.
 
         :param stm: statements string
@@ -137,7 +138,7 @@ class Sql(Task):
             return SqlType.SELECT
 
     @property
-    def datasource(self) -> Dict:
+    def datasource(self) -> dict:
         """Get datasource for procedure sql."""
         datasource_task_u = Datasource.get_task_usage_4j(
             self.datasource_name, self.datasource_type
@@ -148,7 +149,7 @@ class Sql(Task):
         }
 
     @property
-    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
Dict:
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
dict:
         """Override Task.task_params for sql task.
 
         sql task have some specials attribute for task_params, and is odd if we
diff --git a/src/pydolphinscheduler/tasks/sub_workflow.py 
b/src/pydolphinscheduler/tasks/sub_workflow.py
index 1cb579c..715a21e 100644
--- a/src/pydolphinscheduler/tasks/sub_workflow.py
+++ b/src/pydolphinscheduler/tasks/sub_workflow.py
@@ -17,7 +17,7 @@
 
 """Task sub workflow."""
 
-from typing import Dict
+from __future__ import annotations
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
@@ -43,7 +43,7 @@ class SubWorkflow(Task):
         """
         return self.get_workflow_info(self.workflow_name).get("code")
 
-    def get_workflow_info(self, workflow_name: str) -> Dict:
+    def get_workflow_info(self, workflow_name: str) -> dict:
         """Get workflow info from java gateway, contains workflow id, name, 
code."""
         if not self.workflow:
             raise PyDSWorkflowNotAssignException(
diff --git a/src/pydolphinscheduler/tasks/switch.py 
b/src/pydolphinscheduler/tasks/switch.py
index acc23ba..b0969af 100644
--- a/src/pydolphinscheduler/tasks/switch.py
+++ b/src/pydolphinscheduler/tasks/switch.py
@@ -16,8 +16,7 @@
 # under the License.
 
 """Task Switch."""
-
-from typing import Dict, Optional
+from __future__ import annotations
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
@@ -35,7 +34,7 @@ class SwitchBranch(Base):
         "next_node",
     }
 
-    def __init__(self, task: Task, exp: Optional[str] = None):
+    def __init__(self, task: Task, exp: str | None = None):
         super().__init__(f"Switch.{self.__class__.__name__.upper()}")
         self.task = task
         self.exp = exp
@@ -46,11 +45,11 @@ class SwitchBranch(Base):
         return self.task.code
 
     @property
-    def condition(self) -> Optional[str]:
+    def condition(self) -> str | None:
         """Get task switch property condition."""
         return self.exp
 
-    def get_define(self, camel_attr: bool = True) -> Dict:
+    def get_define(self, camel_attr: bool = True) -> dict:
         """Get :class:`ConditionBranch` definition attribute communicate to 
Java gateway server."""
         if self.condition:
             self._DEFINE_ATTR.add("condition")
@@ -121,7 +120,7 @@ class SwitchCondition(Base):
             setattr(self, "next_node", "")
         setattr(self, "depend_task_list", result)
 
-    def get_define(self, camel_attr=True) -> Dict:
+    def get_define(self, camel_attr=True) -> dict:
         """Overwrite Base.get_define to get task Condition specific get 
define."""
         self.set_define_attr()
         return super().get_define()
@@ -154,7 +153,7 @@ class Switch(Task):
         self.set_downstream(downstream)
 
     @property
-    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
Dict:
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
dict:
         """Override Task.task_params for switch task.
 
         switch task have some specials attribute `switch`, and in most of the 
task
diff --git a/src/pydolphinscheduler/utils/file.py 
b/src/pydolphinscheduler/utils/file.py
index 075b902..740f8ba 100644
--- a/src/pydolphinscheduler/utils/file.py
+++ b/src/pydolphinscheduler/utils/file.py
@@ -17,15 +17,16 @@
 
 """File util for pydolphinscheduler."""
 
+from __future__ import annotations
+
 from pathlib import Path
-from typing import Optional
 
 
 def write(
     content: str,
     to_path: str,
-    create: Optional[bool] = True,
-    overwrite: Optional[bool] = False,
+    create: bool | None = True,
+    overwrite: bool | None = False,
 ) -> None:
     """Write configs dict to configuration file.
 
diff --git a/src/pydolphinscheduler/utils/yaml_parser.py 
b/src/pydolphinscheduler/utils/yaml_parser.py
index 46ee08c..a79dfec 100644
--- a/src/pydolphinscheduler/utils/yaml_parser.py
+++ b/src/pydolphinscheduler/utils/yaml_parser.py
@@ -16,10 +16,11 @@
 # under the License.
 
 """YAML parser utils, parser yaml string to ``ruamel.yaml`` object and nested 
key dict."""
+from __future__ import annotations
 
 import copy
 import io
-from typing import Any, Dict, Optional
+from typing import Any
 
 from ruamel.yaml import YAML
 from ruamel.yaml.comments import CommentedMap
@@ -58,7 +59,7 @@ class YamlParser:
         yaml_parser["one.two2"] = "value4"
     """
 
-    def __init__(self, content: str, delimiter: Optional[str] = "."):
+    def __init__(self, content: str, delimiter: str | None = "."):
         self._content = content
         self.src_parser = content
         self._delimiter = delimiter
@@ -75,7 +76,7 @@ class YamlParser:
         self._src_parser = self._yaml.load(content)
 
     def parse_nested_dict(
-        self, result: Dict, commented_map: CommentedMap, key: str
+        self, result: dict, commented_map: CommentedMap, key: str
     ) -> None:
         """Parse :class:`ruamel.yaml.comments.CommentedMap` to nested dict 
using :param:`delimiter`."""
         if not isinstance(commented_map, CommentedMap):
@@ -86,7 +87,7 @@ class YamlParser:
             self.parse_nested_dict(result, commented_map[sub_key], next_key)
 
     @property
-    def dict_parser(self) -> Dict:
+    def dict_parser(self) -> dict:
         """Get :class:`CommentedMap` to nested dict using :param:`delimiter` 
as key delimiter.
 
         Use Depth-First-Search get all nested key and value, and all key 
connect by :param:`delimiter`.
diff --git a/tests/core/test_task.py b/tests/core/test_task.py
index 3e62157..8b7d2a0 100644
--- a/tests/core/test_task.py
+++ b/tests/core/test_task.py
@@ -16,11 +16,12 @@
 # under the License.
 
 """Test Task class function."""
+from __future__ import annotations
+
 import logging
 import re
 import warnings
 from datetime import timedelta
-from typing import Set, Tuple, Union
 from unittest.mock import PropertyMock, patch
 
 import pytest
@@ -92,7 +93,7 @@ TEST_TASK_RELATION_SIZE = 0
         ),
     ],
 )
-def test__get_attr(addition: Set, ignore: Set, expect: Set):
+def test__get_attr(addition: set, ignore: set, expect: set):
     """Test task function `_get_attr`."""
     task = TestTask(
         name="test-get-attr",
@@ -117,7 +118,7 @@ def test__get_attr(addition: Set, ignore: Set, expect: Set):
         (None, (0, "CLOSE")),
     ],
 )
-def test_task_timeout(value: Union[timedelta, int], expect: Tuple[int, str]):
+def test_task_timeout(value: timedelta | int, expect: tuple[int, str]):
     """Test task timout attribute."""
     task = TestTask(
         name="test-get-attr",
diff --git a/tests/core/test_workflow.py b/tests/core/test_workflow.py
index 3e9fbd0..66748ed 100644
--- a/tests/core/test_workflow.py
+++ b/tests/core/test_workflow.py
@@ -16,9 +16,11 @@
 # under the License.
 
 """Test workflow."""
+from __future__ import annotations
+
 import warnings
 from datetime import datetime, timedelta
-from typing import Any, List, Union
+from typing import Any
 from unittest.mock import patch
 
 import pytest
@@ -93,7 +95,7 @@ def test_workflow_default_value(name, value):
         ("param", dict, {"key": "value"}),
         (
             "resource_list",
-            List,
+            list,
             [Resource(name="/dev/test.py", content="hello world", 
description="desc")],
         ),
     ],
@@ -117,7 +119,7 @@ def test_set_attr(name, cls, expect):
         (timedelta(seconds=360), 6),
     ],
 )
-def test_workflow_timeout(value: Union[timedelta, int], expect: int):
+def test_workflow_timeout(value: timedelta | int, expect: int):
     """Test workflow timout attribute."""
     with Workflow(TEST_WORKFLOW_NAME, timeout=value) as workflow:
         assert (
diff --git a/tests/integration/test_process_definition.py 
b/tests/integration/test_process_definition.py
index 010ca76..42aa96a 100644
--- a/tests/integration/test_process_definition.py
+++ b/tests/integration/test_process_definition.py
@@ -16,8 +16,7 @@
 # under the License.
 
 """Test workflow in integration."""
-
-from typing import Dict
+from __future__ import annotations
 
 import pytest
 
@@ -41,7 +40,7 @@ TASK_NAME = f"task_{WORKFLOW_NAME}"
         )
     ],
 )
-def test_change_workflow_attr(pre: Dict, post: Dict):
+def test_change_workflow_attr(pre: dict, post: dict):
     """Test whether workflow success when specific attribute change."""
     assert pre.keys() == post.keys(), "Not equal keys for pre and post 
attribute."
     for attrs in [pre, post]:
diff --git a/tests/tasks/test_condition.py b/tests/tasks/test_condition.py
index 217c727..41dbdb2 100644
--- a/tests/tasks/test_condition.py
+++ b/tests/tasks/test_condition.py
@@ -16,7 +16,8 @@
 # under the License.
 
 """Test Task dependent."""
-from typing import List, Tuple
+from __future__ import annotations
+
 from unittest.mock import patch
 
 import pytest
@@ -64,7 +65,7 @@ def test_class_status_status_name(obj: Status, expect: str):
         (FAILURE, (ConditionOperator(1), ConditionOperator(2), 
ConditionOperator(3))),
     ],
 )
-def test_class_status_depend_item_list_no_expect_type(obj: Status, tasks: 
Tuple):
+def test_class_status_depend_item_list_no_expect_type(obj: Status, tasks: 
tuple):
     """Test class status and sub class raise error when assign not support 
type."""
     with pytest.raises(
         PyDSParamException, match=".*?only accept class Task or sub class 
Task, but get"
@@ -86,7 +87,7 @@ def test_class_status_depend_item_list_no_expect_type(obj: 
Status, tasks: Tuple)
         (FAILURE, [Task(str(i), TEST_TYPE) for i in range(3)]),
     ],
 )
-def test_class_status_depend_item_list(obj: Status, tasks: Tuple):
+def test_class_status_depend_item_list(obj: Status, tasks: tuple):
     """Test class status and sub class function :func:`depend_item_list`."""
     status = obj.status_name()
     expect = [
@@ -218,7 +219,7 @@ def test_condition_operator_set_define_attr_status(
     ],
 )
 def test_condition_operator_set_define_attr_mix_status(
-    obj: ConditionOperator, status: List[Status]
+    obj: ConditionOperator, status: list[Status]
 ):
     """Test :func:`set_define_attr` with one or more mixed status."""
     attr = "depend_item_list"
@@ -288,7 +289,7 @@ def test_condition_operator_set_define_attr_operator(
     ],
 )
 def test_condition_operator_set_define_attr_mix_operator(
-    cond: ConditionOperator, sub_cond: Tuple[ConditionOperator]
+    cond: ConditionOperator, sub_cond: tuple[ConditionOperator]
 ):
     """Test :func:`set_define_attr` with one or more class mix condition 
operator."""
     attr = "depend_task_list"
diff --git a/tests/tasks/test_dependent.py b/tests/tasks/test_dependent.py
index e35ca87..4be4501 100644
--- a/tests/tasks/test_dependent.py
+++ b/tests/tasks/test_dependent.py
@@ -16,9 +16,10 @@
 # under the License.
 
 """Test Task dependent."""
+from __future__ import annotations
+
 import itertools
 import warnings
-from typing import Dict, List, Optional, Tuple, Union
 from unittest.mock import patch
 
 import pytest
@@ -162,7 +163,7 @@ def test_dependent_item_date_error():
         ({}, None),
     ],
 )
-def test_dependent_item_code_parameter(task_name: dict, result: Optional[str]):
+def test_dependent_item_code_parameter(task_name: dict, result: str | None):
     """Test dependent item property code_parameter."""
     dependent_item = DependentItem(
         project_name=TEST_PROJECT,
@@ -358,9 +359,9 @@ def test_dependent_operator_set_define_error(mock_code, 
arg_list):
 )
 def test_operator_dependent_item(
     mock_code_info,
-    operators: Tuple[DependentOperator],
-    kwargs: Tuple[dict],
-    expect: List[Dict],
+    operators: tuple[DependentOperator],
+    kwargs: tuple[dict],
+    expect: list[dict],
 ):
     """Test DependentOperator(DependentItem) function get_define.
 
@@ -557,9 +558,9 @@ def test_operator_dependent_item(
 )
 def test_operator_dependent_task_list_multi_dependent_item(
     mock_code_info,
-    operators: Tuple[DependentOperator],
-    args: Tuple[Union[Tuple, dict]],
-    expect: List[Dict],
+    operators: tuple[DependentOperator],
+    args: tuple[tuple | dict],
+    expect: list[dict],
 ):
     """Test DependentOperator(DependentOperator(DependentItem)) single 
operator function get_define.
 
@@ -687,9 +688,9 @@ def get_dep_task_list(*operator):
 )
 def test_operator_dependent_task_list_multi_dependent_list(
     mock_code_info,
-    operators: Tuple[DependentOperator],
-    args: Tuple[Union[Tuple, dict]],
-    expect: List[Dict],
+    operators: tuple[DependentOperator],
+    args: tuple[tuple | dict],
+    expect: list[dict],
 ):
     """Test DependentOperator(DependentOperator(DependentItem)) multiply 
operator function get_define.
 
diff --git a/tests/tasks/test_switch.py b/tests/tasks/test_switch.py
index dcd5e40..e9a24bf 100644
--- a/tests/tasks/test_switch.py
+++ b/tests/tasks/test_switch.py
@@ -16,8 +16,8 @@
 # under the License.
 
 """Test Task switch."""
+from __future__ import annotations
 
-from typing import Optional, Tuple
 from unittest.mock import patch
 
 import pytest
@@ -37,7 +37,7 @@ TEST_NAME = "test-task"
 TEST_TYPE = "test-type"
 
 
-def task_switch_arg_wrapper(obj, task: Task, exp: Optional[str] = None) -> 
SwitchBranch:
+def task_switch_arg_wrapper(obj, task: Task, exp: str | None = None) -> 
SwitchBranch:
     """Wrap task switch and its subclass."""
     if obj is Default:
         return obj(task)
@@ -121,7 +121,7 @@ def test_switch_branch_get_define_condition(obj: 
SwitchBranch):
         ),
     ],
 )
-def test_switch_condition_set_define_attr_error(args: Tuple, msg: str):
+def test_switch_condition_set_define_attr_error(args: tuple, msg: str):
     """Test error case on :class:`SwitchCondition`."""
     switch_condition = SwitchCondition(*args)
     with pytest.raises(PyDSParamException, match=msg):
diff --git a/tests/testing/docker_wrapper.py b/tests/testing/docker_wrapper.py
index a3d0b6e..70ca98d 100644
--- a/tests/testing/docker_wrapper.py
+++ b/tests/testing/docker_wrapper.py
@@ -16,9 +16,9 @@
 # under the License.
 
 """Wrap docker commands for easier create docker container."""
+from __future__ import annotations
 
 import time
-from typing import Optional
 
 import docker
 from docker.errors import ImageNotFound
@@ -50,7 +50,7 @@ class DockerWrapper:
         )
 
     def run_until_log(
-        self, log: str, remove_exists: Optional[bool] = True, *args, **kwargs
+        self, log: str, remove_exists: bool | None = True, *args, **kwargs
     ) -> Container:
         """Create and run a new container, return when specific log appear.
 
diff --git a/tests/testing/file.py b/tests/testing/file.py
index 9816b94..67fd4ba 100644
--- a/tests/testing/file.py
+++ b/tests/testing/file.py
@@ -16,18 +16,18 @@
 # under the License.
 
 """Testing util about file operating."""
+from __future__ import annotations
 
 from pathlib import Path
-from typing import Union
 
 
-def get_file_content(path: Union[str, Path]) -> str:
+def get_file_content(path: str | Path) -> str:
     """Get file content in given path."""
     with open(path) as f:
         return f.read()
 
 
-def delete_file(path: Union[str, Path]) -> None:
+def delete_file(path: str | Path) -> None:
     """Delete file in given path."""
     path = Path(path).expanduser() if isinstance(path, str) else 
path.expanduser()
     if path.exists():
diff --git a/tests/testing/path.py b/tests/testing/path.py
index 974ab3d..0359a23 100644
--- a/tests/testing/path.py
+++ b/tests/testing/path.py
@@ -16,9 +16,11 @@
 # under the License.
 
 """Handle path related issue in test module."""
+from __future__ import annotations
 
+from collections.abc import Generator
 from pathlib import Path
-from typing import Any, Generator
+from typing import Any
 
 project_root = Path(__file__).parent.parent.parent
 
diff --git a/tests/utils/test_date.py b/tests/utils/test_date.py
index 6546c4a..9599d61 100644
--- a/tests/utils/test_date.py
+++ b/tests/utils/test_date.py
@@ -16,9 +16,9 @@
 # under the License.
 
 """Test utils.date module."""
+from __future__ import annotations
 
 from datetime import datetime, timedelta
-from typing import Dict
 
 import pytest
 
@@ -97,7 +97,7 @@ def test_conv_from_str_not_impl(src: str) -> None:
         ({"hours": 1.3}, 78),
     ],
 )
-def test_timedelta2timeout(src: Dict, expect: int) -> None:
+def test_timedelta2timeout(src: dict, expect: int) -> None:
     """Test function timedelta2timeout."""
     td = timedelta(**src)
     assert timedelta2timeout(td) == expect, f"Test case {td} not expect to 
{expect}."
diff --git a/tests/utils/test_yaml_parser.py b/tests/utils/test_yaml_parser.py
index 6f084ba..c017067 100644
--- a/tests/utils/test_yaml_parser.py
+++ b/tests/utils/test_yaml_parser.py
@@ -16,8 +16,7 @@
 # under the License.
 
 """Test utils.path_dict module."""
-
-from typing import Dict
+from __future__ import annotations
 
 import pytest
 from ruamel.yaml import YAML
@@ -99,7 +98,7 @@ with open(path_default_config_yaml) as f:
         ),
     ],
 )
-def test_yaml_parser_specific_delimiter(src: str, delimiter: str, expect: 
Dict):
+def test_yaml_parser_specific_delimiter(src: str, delimiter: str, expect: 
dict):
     """Test specific delimiter for :class:`YamlParser`."""
 
     def ch_dl(key):
@@ -135,7 +134,7 @@ def test_yaml_parser_specific_delimiter(src: str, 
delimiter: str, expect: Dict):
         ),
     ],
 )
-def test_yaml_parser_contains(src: str, expect: Dict):
+def test_yaml_parser_contains(src: str, expect: dict):
     """Test magic function :func:`YamlParser.__contain__` also with `key in 
obj` syntax."""
     yaml_parser = YamlParser(src)
     assert len(expect.keys()) == len(
@@ -159,7 +158,7 @@ def test_yaml_parser_contains(src: str, expect: Dict):
         ),
     ],
 )
-def test_yaml_parser_get(src: str, expect: Dict):
+def test_yaml_parser_get(src: str, expect: dict):
     """Test magic function :func:`YamlParser.__getitem__` also with `obj[key]` 
syntax."""
     yaml_parser = YamlParser(src)
     assert all(
@@ -191,7 +190,7 @@ def test_yaml_parser_get(src: str, expect: Dict):
         ),
     ],
 )
-def test_yaml_parser_set(src: str, expect: Dict):
+def test_yaml_parser_set(src: str, expect: dict):
     """Test magic function :func:`YamlParser.__setitem__` also with `obj[key] 
= val` syntax."""
     yaml_parser = YamlParser(src)
     for key in expect:
@@ -241,7 +240,7 @@ name:
         ),
     ],
 )
-def test_yaml_parser_str_repr(src: str, setter: Dict, expect: str):
+def test_yaml_parser_str_repr(src: str, setter: dict, expect: str):
     """Test function :func:`YamlParser.to_string`."""
     yaml_parser = YamlParser(src)
 


Reply via email to