This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 015fef31bb Improve importing the modules in Airflow dag_processing,
datasets and example_dags packages (#33808)
015fef31bb is described below
commit 015fef31bb7f848b321d5287edeb6ff6ac63ab10
Author: Hussein Awala <[email protected]>
AuthorDate: Mon Aug 28 11:51:14 2023 +0200
Improve importing the modules in Airflow dag_processing, datasets and
example_dags packages (#33808)
* Improve importing the modules in Airflow dag_processing, datasets and
example_dags packages
---------
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/dag_processing/manager.py | 9 ++++++---
airflow/dag_processing/processor.py | 16 ++++++++++------
airflow/datasets/manager.py | 5 +++--
airflow/example_dags/example_dag_decorator.py | 6 ++++--
airflow/example_dags/example_params_trigger_ui.py | 7 +++++--
airflow/example_dags/example_params_ui_tutorial.py | 7 +++++--
airflow/example_dags/example_skip_dag.py | 6 +++++-
airflow/example_dags/plugins/workday.py | 6 +++++-
8 files changed, 43 insertions(+), 19 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index 269ba769cb..3b26da2c68 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -33,12 +33,10 @@ import zipfile
from collections import defaultdict
from datetime import datetime, timedelta
from importlib import import_module
-from multiprocessing.connection import Connection as MultiprocessingConnection
from pathlib import Path
-from typing import Any, Callable, Iterator, NamedTuple, cast
+from typing import TYPE_CHECKING, Any, Callable, Iterator, NamedTuple, cast
from setproctitle import setproctitle
-from sqlalchemy.orm import Session
from tabulate import tabulate
import airflow.models
@@ -67,6 +65,11 @@ from airflow.utils.retries import retry_db_transaction
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import prohibit_commit, skip_locked,
with_row_locks
+if TYPE_CHECKING:
+ from multiprocessing.connection import Connection as
MultiprocessingConnection
+
+ from sqlalchemy.orm import Session
+
class DagParsingStat(NamedTuple):
"""Information on processing progress."""
diff --git a/airflow/dag_processing/processor.py
b/airflow/dag_processing/processor.py
index ab17bad4da..1cb9c74a27 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -18,25 +18,21 @@ from __future__ import annotations
import importlib
import logging
-import multiprocessing
import os
import signal
import threading
import time
import zipfile
from contextlib import redirect_stderr, redirect_stdout, suppress
-from datetime import datetime, timedelta
-from multiprocessing.connection import Connection as MultiprocessingConnection
+from datetime import timedelta
from typing import TYPE_CHECKING, Iterable, Iterator
from setproctitle import setproctitle
from sqlalchemy import delete, exc, func, or_, select
-from sqlalchemy.orm.session import Session
from airflow import settings
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.callbacks.callback_requests import (
- CallbackRequest,
DagCallbackRequest,
SlaCallbackRequest,
TaskCallbackRequest,
@@ -44,7 +40,7 @@ from airflow.callbacks.callback_requests import (
from airflow.configuration import conf
from airflow.exceptions import AirflowException, TaskNotFound
from airflow.models import SlaMiss, errors
-from airflow.models.dag import DAG, DagModel
+from airflow.models.dag import DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun as DR
from airflow.models.dagwarning import DagWarning, DagWarningType
@@ -59,6 +55,14 @@ from airflow.utils.session import NEW_SESSION,
provide_session
from airflow.utils.state import TaskInstanceState
if TYPE_CHECKING:
+ import multiprocessing
+ from datetime import datetime
+ from multiprocessing.connection import Connection as
MultiprocessingConnection
+
+ from sqlalchemy.orm.session import Session
+
+ from airflow.callbacks.callback_requests import CallbackRequest
+ from airflow.models.dag import DAG
from airflow.models.operator import Operator
diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py
index c7e062803a..8714ba0658 100644
--- a/airflow/datasets/manager.py
+++ b/airflow/datasets/manager.py
@@ -20,15 +20,16 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import exc, select
-from sqlalchemy.orm.session import Session
from airflow.configuration import conf
-from airflow.datasets import Dataset
from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent,
DatasetModel
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
if TYPE_CHECKING:
+ from sqlalchemy.orm.session import Session
+
+ from airflow.datasets import Dataset
from airflow.models.taskinstance import TaskInstance
diff --git a/airflow/example_dags/example_dag_decorator.py
b/airflow/example_dags/example_dag_decorator.py
index e8ee8a7299..447b4471b9 100644
--- a/airflow/example_dags/example_dag_decorator.py
+++ b/airflow/example_dags/example_dag_decorator.py
@@ -17,7 +17,7 @@
# under the License.
from __future__ import annotations
-from typing import Any
+from typing import TYPE_CHECKING, Any
import httpx
import pendulum
@@ -25,7 +25,9 @@ import pendulum
from airflow.decorators import dag, task
from airflow.models.baseoperator import BaseOperator
from airflow.operators.email import EmailOperator
-from airflow.utils.context import Context
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
class GetRequestOperator(BaseOperator):
diff --git a/airflow/example_dags/example_params_trigger_ui.py
b/airflow/example_dags/example_params_trigger_ui.py
index 564861f52b..1ca0a19803 100644
--- a/airflow/example_dags/example_params_trigger_ui.py
+++ b/airflow/example_dags/example_params_trigger_ui.py
@@ -23,14 +23,17 @@ from __future__ import annotations
import datetime
from pathlib import Path
+from typing import TYPE_CHECKING
from airflow import DAG
from airflow.decorators import task
-from airflow.models.dagrun import DagRun
from airflow.models.param import Param
-from airflow.models.taskinstance import TaskInstance
from airflow.utils.trigger_rule import TriggerRule
+if TYPE_CHECKING:
+ from airflow.models.dagrun import DagRun
+ from airflow.models.taskinstance import TaskInstance
+
with DAG(
dag_id=Path(__file__).stem,
description=__doc__.partition(".")[0],
diff --git a/airflow/example_dags/example_params_ui_tutorial.py
b/airflow/example_dags/example_params_ui_tutorial.py
index 9af4e0ba4a..a9da876bf2 100644
--- a/airflow/example_dags/example_params_ui_tutorial.py
+++ b/airflow/example_dags/example_params_ui_tutorial.py
@@ -25,13 +25,16 @@ from __future__ import annotations
import datetime
import json
from pathlib import Path
+from typing import TYPE_CHECKING
from airflow import DAG
from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
-from airflow.models.dagrun import DagRun
from airflow.models.param import Param
-from airflow.models.taskinstance import TaskInstance
+
+if TYPE_CHECKING:
+ from airflow.models.dagrun import DagRun
+ from airflow.models.taskinstance import TaskInstance
with DAG(
dag_id=Path(__file__).stem,
diff --git a/airflow/example_dags/example_skip_dag.py
b/airflow/example_dags/example_skip_dag.py
index 7723d9f9c7..ced2f6ec13 100644
--- a/airflow/example_dags/example_skip_dag.py
+++ b/airflow/example_dags/example_skip_dag.py
@@ -18,15 +18,19 @@
"""Example DAG demonstrating the EmptyOperator and a custom EmptySkipOperator
which skips by default."""
from __future__ import annotations
+from typing import TYPE_CHECKING
+
import pendulum
from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.models.baseoperator import BaseOperator
from airflow.operators.empty import EmptyOperator
-from airflow.utils.context import Context
from airflow.utils.trigger_rule import TriggerRule
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
# Create some placeholder operators
class EmptySkipOperator(BaseOperator):
diff --git a/airflow/example_dags/plugins/workday.py
b/airflow/example_dags/plugins/workday.py
index 20363a69e7..79473e06dd 100644
--- a/airflow/example_dags/plugins/workday.py
+++ b/airflow/example_dags/plugins/workday.py
@@ -20,12 +20,16 @@ from __future__ import annotations
import logging
from datetime import timedelta
+from typing import TYPE_CHECKING
# [START howto_timetable]
from pendulum import UTC, Date, DateTime, Time
from airflow.plugins_manager import AirflowPlugin
-from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction,
Timetable
+from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
+
+if TYPE_CHECKING:
+ from airflow.timetables.base import TimeRestriction
log = logging.getLogger(__name__)
try: