This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0c8295ce91 Improve importing the modules in Airflow serialization,
task, template and ti_deps packages (#33812)
0c8295ce91 is described below
commit 0c8295ce916238bc388632bda68e503fd4987338
Author: Hussein Awala <[email protected]>
AuthorDate: Mon Aug 28 07:23:14 2023 +0200
Improve importing the modules in Airflow serialization, task, template and
ti_deps packages (#33812)
---
airflow/serialization/serde.py | 6 ++++--
airflow/serialization/serialized_objects.py | 19 ++++++++++++-------
airflow/task/task_runner/__init__.py | 7 +++++--
airflow/task/task_runner/base_task_runner.py | 6 +++++-
airflow/task/task_runner/cgroup_task_runner.py | 5 ++++-
airflow/task/task_runner/standard_task_runner.py | 5 ++++-
airflow/template/templater.py | 2 +-
airflow/ti_deps/dep_context.py | 3 ++-
airflow/ti_deps/deps/prev_dagrun_dep.py | 3 ++-
airflow/ti_deps/deps/trigger_rule_dep.py | 5 +++--
10 files changed, 42 insertions(+), 19 deletions(-)
diff --git a/airflow/serialization/serde.py b/airflow/serialization/serde.py
index 5e5908df90..006d9d9318 100644
--- a/airflow/serialization/serde.py
+++ b/airflow/serialization/serde.py
@@ -23,8 +23,7 @@ import functools
import logging
import sys
from importlib import import_module
-from types import ModuleType
-from typing import Any, Pattern, TypeVar, Union, cast
+from typing import TYPE_CHECKING, Any, Pattern, TypeVar, Union, cast
import attr
import re2
@@ -34,6 +33,9 @@ from airflow.configuration import conf
from airflow.stats import Stats
from airflow.utils.module_loading import import_string, iter_namespace,
qualname
+if TYPE_CHECKING:
+ from types import ModuleType
+
log = logging.getLogger(__name__)
MAX_RECURSION_DEPTH = sys.getrecursionlimit() - 1
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index b3a252783a..a63843e971 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -25,7 +25,7 @@ import logging
import warnings
import weakref
from dataclasses import dataclass
-from inspect import Parameter, signature
+from inspect import signature
from typing import TYPE_CHECKING, Any, Collection, Iterable, Mapping,
NamedTuple, Union
import cattr
@@ -39,27 +39,24 @@ from airflow.configuration import conf
from airflow.datasets import Dataset
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning,
SerializationError
from airflow.jobs.job import Job
-from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
+from airflow.models.baseoperator import BaseOperator
from airflow.models.connection import Connection
from airflow.models.dag import DAG, create_timetable
from airflow.models.dagrun import DagRun
-from airflow.models.expandinput import EXPAND_INPUT_EMPTY, ExpandInput,
create_expand_input, get_map_type_key
+from airflow.models.expandinput import EXPAND_INPUT_EMPTY,
create_expand_input, get_map_type_key
from airflow.models.mappedoperator import MappedOperator
-from airflow.models.operator import Operator
from airflow.models.param import Param, ParamsDict
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
-from airflow.models.taskmixin import DAGNode
from airflow.models.xcom_arg import XComArg, deserialize_xcom_arg,
serialize_xcom_arg
from airflow.providers_manager import ProvidersManager
from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
from airflow.serialization.helpers import serialize_template_field
-from airflow.serialization.json_schema import Validator, load_dag_schema
+from airflow.serialization.json_schema import load_dag_schema
from airflow.serialization.pydantic.dag_run import DagRunPydantic
from airflow.serialization.pydantic.dataset import DatasetPydantic
from airflow.serialization.pydantic.job import JobPydantic
from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
from airflow.settings import _ENABLE_AIP_44, DAGS_FOLDER, json
-from airflow.timetables.base import Timetable
from airflow.utils.code_utils import get_python_source
from airflow.utils.docs import get_docs_url
from airflow.utils.module_loading import import_string, qualname
@@ -68,9 +65,17 @@ from airflow.utils.task_group import MappedTaskGroup,
TaskGroup
from airflow.utils.types import NOTSET, ArgNotSet
if TYPE_CHECKING:
+ from inspect import Parameter
+
from pydantic import BaseModel
+ from airflow.models.baseoperator import BaseOperatorLink
+ from airflow.models.expandinput import ExpandInput
+ from airflow.models.operator import Operator
+ from airflow.models.taskmixin import DAGNode
+ from airflow.serialization.json_schema import Validator
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+ from airflow.timetables.base import Timetable
HAS_KUBERNETES: bool
try:
diff --git a/airflow/task/task_runner/__init__.py
b/airflow/task/task_runner/__init__.py
index 078b24a402..11a35c177b 100644
--- a/airflow/task/task_runner/__init__.py
+++ b/airflow/task/task_runner/__init__.py
@@ -18,13 +18,16 @@
from __future__ import annotations
import logging
+from typing import TYPE_CHECKING
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
-from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
-from airflow.task.task_runner.base_task_runner import BaseTaskRunner
from airflow.utils.module_loading import import_string
+if TYPE_CHECKING:
+ from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
+ from airflow.task.task_runner.base_task_runner import BaseTaskRunner
+
log = logging.getLogger(__name__)
_TASK_RUNNER_NAME = conf.get("core", "TASK_RUNNER")
diff --git a/airflow/task/task_runner/base_task_runner.py
b/airflow/task/task_runner/base_task_runner.py
index c1045280dc..979a547d3d 100644
--- a/airflow/task/task_runner/base_task_runner.py
+++ b/airflow/task/task_runner/base_task_runner.py
@@ -22,7 +22,6 @@ import os
import subprocess
import threading
-from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils.platform import IS_WINDOWS
@@ -30,6 +29,8 @@ if not IS_WINDOWS:
# ignored to avoid flake complaining on Linux
from pwd import getpwnam # noqa
+from typing import TYPE_CHECKING
+
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.utils.configuration import tmp_configuration_copy
@@ -37,6 +38,9 @@ from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.platform import getuser
+if TYPE_CHECKING:
+ from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
+
PYTHONPATH_VAR = "PYTHONPATH"
diff --git a/airflow/task/task_runner/cgroup_task_runner.py
b/airflow/task/task_runner/cgroup_task_runner.py
index 14354453bc..7034058c5b 100644
--- a/airflow/task/task_runner/cgroup_task_runner.py
+++ b/airflow/task/task_runner/cgroup_task_runner.py
@@ -21,16 +21,19 @@ from __future__ import annotations
import datetime
import os
import uuid
+from typing import TYPE_CHECKING
import psutil
from cgroupspy import trees
-from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
from airflow.task.task_runner.base_task_runner import BaseTaskRunner
from airflow.utils.operator_resources import Resources
from airflow.utils.platform import getuser
from airflow.utils.process_utils import reap_process_group
+if TYPE_CHECKING:
+ from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
+
class CgroupTaskRunner(BaseTaskRunner):
"""
diff --git a/airflow/task/task_runner/standard_task_runner.py
b/airflow/task/task_runner/standard_task_runner.py
index 9b506580e1..a2e4ec1319 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -20,17 +20,20 @@ from __future__ import annotations
import logging
import os
+from typing import TYPE_CHECKING
import psutil
from setproctitle import setproctitle
-from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
from airflow.models.taskinstance import TaskReturnCode
from airflow.settings import CAN_FORK
from airflow.task.task_runner.base_task_runner import BaseTaskRunner
from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils.process_utils import reap_process_group,
set_new_process_group
+if TYPE_CHECKING:
+ from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
+
class StandardTaskRunner(BaseTaskRunner):
"""Standard runner for all tasks."""
diff --git a/airflow/template/templater.py b/airflow/template/templater.py
index 19eb3f5996..07aead8580 100644
--- a/airflow/template/templater.py
+++ b/airflow/template/templater.py
@@ -19,7 +19,6 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Any, Collection, Iterable, Sequence
-from airflow.utils.context import Context
from airflow.utils.helpers import render_template_as_native,
render_template_to_string
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.mixins import ResolveMixin
@@ -30,6 +29,7 @@ if TYPE_CHECKING:
from sqlalchemy.orm import Session
from airflow import DAG
+ from airflow.utils.context import Context
class Templater(LoggingMixin):
diff --git a/airflow/ti_deps/dep_context.py b/airflow/ti_deps/dep_context.py
index bd4c0db461..6b8c3dc773 100644
--- a/airflow/ti_deps/dep_context.py
+++ b/airflow/ti_deps/dep_context.py
@@ -20,12 +20,13 @@ from __future__ import annotations
from typing import TYPE_CHECKING
import attr
-from sqlalchemy.orm.session import Session
from airflow.exceptions import TaskNotFound
from airflow.utils.state import State
if TYPE_CHECKING:
+ from sqlalchemy.orm.session import Session
+
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
diff --git a/airflow/ti_deps/deps/prev_dagrun_dep.py
b/airflow/ti_deps/deps/prev_dagrun_dep.py
index 0ea192c708..ef31ec25fe 100644
--- a/airflow/ti_deps/deps/prev_dagrun_dep.py
+++ b/airflow/ti_deps/deps/prev_dagrun_dep.py
@@ -20,7 +20,6 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import func, or_, select
-from sqlalchemy.orm import Session
from airflow.models.taskinstance import PAST_DEPENDS_MET, TaskInstance as TI
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
@@ -29,6 +28,8 @@ from airflow.utils.session import provide_session
from airflow.utils.state import TaskInstanceState
if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
+
from airflow.models.dagrun import DagRun
from airflow.models.operator import Operator
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py
b/airflow/ti_deps/deps/trigger_rule_dep.py
index dbdf692e76..9731d2a7c2 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -25,8 +25,7 @@ from typing import TYPE_CHECKING, Iterator, NamedTuple
from sqlalchemy import and_, func, or_, select
from airflow.models.taskinstance import PAST_DEPENDS_MET
-from airflow.ti_deps.dep_context import DepContext
-from airflow.ti_deps.deps.base_ti_dep import BaseTIDep, TIDepStatus
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.utils.state import TaskInstanceState
from airflow.utils.trigger_rule import TriggerRule as TR
@@ -35,6 +34,8 @@ if TYPE_CHECKING:
from sqlalchemy.sql.expression import ColumnOperators
from airflow.models.taskinstance import TaskInstance
+ from airflow.ti_deps.dep_context import DepContext
+ from airflow.ti_deps.deps.base_ti_dep import TIDepStatus
class _UpstreamTIStates(NamedTuple):