This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 12b8e4c5550 Move DagBag to airflow/dag_processing (#55139)
12b8e4c5550 is described below
commit 12b8e4c555091cd46adf1f8f1e642dc0642096d9
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Wed Sep 24 13:09:26 2025 +0100
Move DagBag to airflow/dag_processing (#55139)
* Move DagBag to airflow/dag_processing
DagBag is not a DB model, it’s the parser/collector used by the
dag processor. This change moves it to a more natural home under
airflow/dag_processing, reduces confusion between “DBDagBag” and "DagBag",
and helps avoid import tangles.
What changed:
New module: airflow.dag_processing.dagbag now hosts:
DagBag, _capture_with_reraise, FileLoadStat, timeout, sync_bag_to_db
Old location airflow.models.dagbag:
Trimmed to DB-facing helpers (DBDagBag, etc.)
Adds __getattr__ shim for backward compatibility.
Updated imports across the codebase
Pre-commit path allowlist updated to include the new file and remove
the old path.
Deprecation & migration
Deprecated: from airflow.models.dagbag import DagBag
Use instead: from airflow.dag_processing.dagbag import DagBag
A deprecation warning is emitted via the shim; no functional behavior
change intended.
Notes
No runtime logic changes to parsing or DAG discovery.
Tests and CLI code updated to the new import path.
sync_bag_to_db moved alongside DagBag to keep parsing + persistence
in one place.
* fixup! Move DagBag to airflow/dag_processing
* fixup! fixup! Move DagBag to airflow/dag_processing
* fixup! fixup! fixup! Move DagBag to airflow/dag_processing
* fixup! fixup! fixup! fixup! Move DagBag to airflow/dag_processing
* fixup! fixup! fixup! fixup! fixup! Move DagBag to airflow/dag_processing
* fixup! fixup! fixup! fixup! fixup! fixup! Move DagBag to
airflow/dag_processing
* fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! Move DagBag to
airflow/dag_processing
* Remove get_dagbag method from DagBundlesManager
* Move DagBag to airflow/dag_processing
DagBag is not a DB model, it’s the parser/collector used by the
dag processor. This change moves it to a more natural home under
airflow/dag_processing, reduces confusion between “DBDagBag” and "DagBag",
and helps avoid import tangles.
What changed:
New module: airflow.dag_processing.dagbag now hosts:
DagBag, _capture_with_reraise, FileLoadStat, timeout, sync_bag_to_db
Old location airflow.models.dagbag:
Trimmed to DB-facing helpers (DBDagBag, etc.)
Adds __getattr__ shim for backward compatibility.
Updated imports across the codebase
Pre-commit path allowlist updated to include the new file and remove
the old path.
Deprecation & migration
Deprecated: from airflow.models.dagbag import DagBag
Use instead: from airflow.dag_processing.dagbag import DagBag
A deprecation warning is emitted via the shim; no functional behavior
change intended.
Notes
No runtime logic changes to parsing or DAG discovery.
Tests and CLI code updated to the new import path.
sync_bag_to_db moved alongside DagBag to keep parsing + persistence
in one place.
* fixup! fixup! fixup! fixup! fixup! fixup! fixup! Move DagBag to
airflow/dag_processing
* Remove get_dagbag method from DagBundlesManager
* Fix dagbag mock
* import DagBag from models instead of models.dagbag in providers
* fixup! import DagBag from models instead of models.dagbag in providers
* fix fab www-hash
* fixup! fix fab www-hash
* leave DagBag import to be from airflow.models.dagbag in init so it issues
deprecation warning
* Use DBDagBag in fab type checking if in AF3.1+
* fix static checks
* Update fab hash
* fixup! Update fab hash
* fixup! fixup! Update fab hash
* fixup! fixup! fixup! Update fab hash
---
.pre-commit-config.yaml | 2 +-
airflow-core/docs/best-practices.rst | 2 +-
.../core_api/routes/public/dag_report.py | 2 +-
.../src/airflow/cli/commands/dag_command.py | 7 +-
.../airflow/{models => dag_processing}/dagbag.py | 112 +---
.../src/airflow/dag_processing/processor.py | 2 +-
airflow-core/src/airflow/models/dagbag.py | 665 +--------------------
airflow-core/src/airflow/utils/cli.py | 8 +-
airflow-core/tests/integration/otel/test_otel.py | 3 +-
.../tests/unit/always/test_example_dags.py | 3 +-
.../core_api/routes/public/test_backfills.py | 3 +-
.../core_api/routes/public/test_dag_report.py | 2 +-
.../core_api/routes/public/test_task_instances.py | 2 +-
.../tests/unit/cli/commands/test_dag_command.py | 11 +-
.../tests/unit/cli/commands/test_task_command.py | 3 +-
airflow-core/tests/unit/cli/conftest.py | 2 +-
.../tests/unit/core/test_impersonation_tests.py | 3 +-
.../tests/unit/dag_processing/test_manager.py | 3 +-
.../tests/unit/dag_processing/test_processor.py | 3 +-
airflow-core/tests/unit/jobs/test_scheduler_job.py | 2 +-
airflow-core/tests/unit/models/test_dag.py | 2 +-
airflow-core/tests/unit/models/test_dagbag.py | 16 +-
airflow-core/tests/unit/models/test_dagcode.py | 2 +-
airflow-core/tests/unit/models/test_dagrun.py | 2 +-
.../tests/unit/models/test_serialized_dag.py | 2 +-
.../unit/serialization/test_dag_serialization.py | 2 +-
devel-common/src/tests_common/pytest_plugin.py | 10 +-
devel-common/src/tests_common/test_utils/db.py | 14 +-
.../fab/auth_manager/security_manager/override.py | 2 +-
.../airflow/providers/fab/www/airflow_flask_app.py | 7 +-
.../airflowDefaultTheme.ff5a35f322070b094aa2.css | 2 +-
.../dist/materialIcons.3e67dd6fbfcc4f3b5105.css | 2 +-
providers/fab/www-hash.txt | 2 +-
.../unit/openlineage/plugins/test_execution.py | 9 +-
.../providers/standard/sensors/external_task.py | 7 +-
.../standard/sensors/test_external_task_sensor.py | 4 +-
.../tests/unit/standard/sensors/test_time_delta.py | 8 +-
.../tests/unit/standard/sensors/test_weekday.py | 8 +-
.../unit/standard/utils/test_sensor_helper.py | 3 +-
.../src/airflow/sdk/execution_time/task_runner.py | 3 +-
40 files changed, 136 insertions(+), 811 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 91bfd6c3203..f4d06e216e1 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -1580,6 +1580,7 @@ repos:
^airflow-core/src/airflow/dag_processing/collection\.py$|
^airflow-core/src/airflow/dag_processing/manager\.py$|
^airflow-core/src/airflow/dag_processing/processor\.py$|
+ ^airflow-core/src/airflow/dag_processing/dagbag\.py$|
^airflow-core/src/airflow/datasets/metadata\.py$|
^airflow-core/src/airflow/exceptions\.py$|
^airflow-core/src/airflow/executors/local_executor\.py$|
@@ -1593,7 +1594,6 @@ repos:
^airflow-core/src/airflow/models/baseoperator\.py$|
^airflow-core/src/airflow/models/connection\.py$|
^airflow-core/src/airflow/models/dag\.py$|
- ^airflow-core/src/airflow/models/dagbag\.py$|
^airflow-core/src/airflow/models/dagrun\.py$|
^airflow-core/src/airflow/models/deadline\.py$|
^airflow-core/src/airflow/models/expandinput\.py$|
diff --git a/airflow-core/docs/best-practices.rst
b/airflow-core/docs/best-practices.rst
index a71b86aa00d..2145a453197 100644
--- a/airflow-core/docs/best-practices.rst
+++ b/airflow-core/docs/best-practices.rst
@@ -727,7 +727,7 @@ Unit tests ensure that there is no incorrect code in your
Dag. You can write uni
import pytest
- from airflow.models import DagBag
+ from airflow.dag_processing.dagbag import DagBag
@pytest.fixture()
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_report.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_report.py
index 42539015b53..d709564923b 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_report.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_report.py
@@ -34,7 +34,7 @@ from airflow.api_fastapi.core_api.security import (
ReadableDagsFilterDep,
requires_access_dag,
)
-from airflow.models.dagbag import DagBag
+from airflow.dag_processing.dagbag import DagBag
dag_report_router = AirflowRouter(tags=["DagReport"], prefix="/dagReports")
diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py
b/airflow-core/src/airflow/cli/commands/dag_command.py
index df7276d8643..135d5991eb8 100644
--- a/airflow-core/src/airflow/cli/commands/dag_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_command.py
@@ -37,11 +37,11 @@ from airflow.api_fastapi.core_api.datamodels.dags import
DAGResponse
from airflow.cli.simple_table import AirflowConsole
from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
from airflow.dag_processing.bundles.manager import DagBundlesManager
+from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.jobs.job import Job
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import DagModel, DagRun, TaskInstance
from airflow.models.dag import get_next_data_interval
-from airflow.models.dagbag import sync_bag_to_db
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import cli as cli_utils
@@ -364,7 +364,7 @@ def dag_list_dags(args, session: Session = NEW_SESSION) ->
None:
dagbag_import_errors = 0
dags_list = []
if args.local:
- from airflow.models.dagbag import DagBag
+ from airflow.dag_processing.dagbag import DagBag
# Get import errors from the local area
if args.bundle_name:
@@ -463,6 +463,7 @@ def dag_list_import_errors(args, session: Session =
NEW_SESSION) -> None:
if args.local:
# Get import errors from local areas
+
if args.bundle_name:
manager = DagBundlesManager()
validate_dag_bundle_arg(args.bundle_name)
diff --git a/airflow-core/src/airflow/models/dagbag.py
b/airflow-core/src/airflow/dag_processing/dagbag.py
similarity index 84%
copy from airflow-core/src/airflow/models/dagbag.py
copy to airflow-core/src/airflow/dag_processing/dagbag.py
index fc29cf99155..ec6f739ac19 100644
--- a/airflow-core/src/airflow/models/dagbag.py
+++ b/airflow-core/src/airflow/dag_processing/dagbag.py
@@ -18,7 +18,6 @@
from __future__ import annotations
import contextlib
-import hashlib
import importlib
import importlib.machinery
import importlib.util
@@ -33,9 +32,6 @@ from datetime import datetime, timedelta
from pathlib import Path
from typing import TYPE_CHECKING, NamedTuple
-from sqlalchemy import Column, String, inspect, select
-from sqlalchemy.orm import joinedload
-from sqlalchemy.orm.attributes import NO_VALUE
from tabulate import tabulate
from airflow import settings
@@ -45,6 +41,7 @@ from airflow.exceptions import (
AirflowClusterPolicyError,
AirflowClusterPolicySkipDag,
AirflowClusterPolicyViolation,
+ AirflowDagCycleException,
AirflowDagDuplicatedIdException,
AirflowException,
AirflowTaskTimeout,
@@ -52,9 +49,7 @@ from airflow.exceptions import (
)
from airflow.executors.executor_loader import ExecutorLoader
from airflow.listeners.listener import get_listener_manager
-from airflow.models.base import Base, StringID
-from airflow.models.dag_version import DagVersion
-from airflow.serialization.serialized_objects import LazyDeserializedDAG,
SerializedDAG
+from airflow.serialization.serialized_objects import LazyDeserializedDAG
from airflow.utils.docs import get_docs_url
from airflow.utils.file import (
correct_maybe_zipped,
@@ -66,20 +61,13 @@ from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.types import NOTSET
-try:
- from airflow.sdk.exceptions import AirflowDagCycleException
-except ImportError:
- from airflow.exceptions import AirflowDagCycleException # type:
ignore[no-redef]
-
if TYPE_CHECKING:
from collections.abc import Generator
from sqlalchemy.orm import Session
from airflow import DAG
- from airflow.models import DagRun
from airflow.models.dagwarning import DagWarning
- from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.types import ArgNotSet
@@ -671,99 +659,3 @@ def sync_bag_to_db(
dagbag.dag_warnings,
session=session,
)
-
-
-class DBDagBag:
- """
- Internal class for retrieving and caching dags in the scheduler.
-
- :meta private:
- """
-
- def __init__(self, load_op_links: bool = True) -> None:
- self._dags: dict[str, SerializedDAG] = {} # dag_version_id to dag
- self.load_op_links = load_op_links
-
- def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
- serdag.load_op_links = self.load_op_links
- if dag := serdag.dag:
- self._dags[serdag.dag_version_id] = dag
- return dag
-
- def _get_dag(self, version_id: str, session: Session) -> SerializedDAG |
None:
- if dag := self._dags.get(version_id):
- return dag
- dag_version = session.get(DagVersion, version_id,
options=[joinedload(DagVersion.serialized_dag)])
- if not dag_version:
- return None
- if not (serdag := dag_version.serialized_dag):
- return None
- return self._read_dag(serdag)
-
- @staticmethod
- def _version_from_dag_run(dag_run: DagRun, *, session: Session) ->
DagVersion:
- if not dag_run.bundle_version:
- if dag_version :=
DagVersion.get_latest_version(dag_id=dag_run.dag_id, session=session):
- return dag_version
-
- # Check if created_dag_version relationship is already loaded to avoid
DetachedInstanceError
- info = inspect(dag_run)
- if info.attrs.created_dag_version.loaded_value is not NO_VALUE:
- # Relationship is already loaded, safe to access
- return dag_run.created_dag_version
-
- # Relationship not loaded, fetch it explicitly from current session
- return session.get(DagVersion, dag_run.created_dag_version_id)
-
- def get_dag_for_run(self, dag_run: DagRun, session: Session) ->
SerializedDAG | None:
- if version := self._version_from_dag_run(dag_run=dag_run,
session=session):
- return self._get_dag(version_id=version.id, session=session)
- return None
-
- def iter_all_latest_version_dags(self, *, session: Session) ->
Generator[SerializedDAG, None, None]:
- """Walk through all latest version dags available in the database."""
- from airflow.models.serialized_dag import SerializedDagModel
-
- for sdm in session.scalars(select(SerializedDagModel)):
- if dag := self._read_dag(sdm):
- yield dag
-
- def get_latest_version_of_dag(self, dag_id: str, *, session: Session) ->
SerializedDAG | None:
- """Get the latest version of a dag by its id."""
- from airflow.models.serialized_dag import SerializedDagModel
-
- if not (serdag := SerializedDagModel.get(dag_id, session=session)):
- return None
- return self._read_dag(serdag)
-
-
-def generate_md5_hash(context):
- bundle_name = context.get_current_parameters()["bundle_name"]
- relative_fileloc = context.get_current_parameters()["relative_fileloc"]
- return
hashlib.md5(f"{bundle_name}:{relative_fileloc}".encode()).hexdigest()
-
-
-class DagPriorityParsingRequest(Base):
- """Model to store the dag parsing requests that will be prioritized when
parsing files."""
-
- __tablename__ = "dag_priority_parsing_request"
-
- # Adding a unique constraint to fileloc results in the creation of an
index and we have a limitation
- # on the size of the string we can use in the index for MySQL DB. We also
have to keep the fileloc
- # size consistent with other tables. This is a workaround to enforce the
unique constraint.
- id = Column(String(32), primary_key=True, default=generate_md5_hash,
onupdate=generate_md5_hash)
-
- bundle_name = Column(StringID(), nullable=False)
- # The location of the file containing the DAG object
- # Note: Do not depend on fileloc pointing to a file; in the case of a
- # packaged DAG, it will point to the subpath of the DAG within the
- # associated zip.
- relative_fileloc = Column(String(2000), nullable=False)
-
- def __init__(self, bundle_name: str, relative_fileloc: str) -> None:
- super().__init__()
- self.bundle_name = bundle_name
- self.relative_fileloc = relative_fileloc
-
- def __repr__(self) -> str:
- return f"<DagPriorityParsingRequest: bundle_name={self.bundle_name}
relative_fileloc={self.relative_fileloc}>"
diff --git a/airflow-core/src/airflow/dag_processing/processor.py
b/airflow-core/src/airflow/dag_processing/processor.py
index a86d308c8c5..cd6f265179a 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -35,7 +35,7 @@ from airflow.callbacks.callback_requests import (
TaskCallbackRequest,
)
from airflow.configuration import conf
-from airflow.models.dagbag import DagBag
+from airflow.dag_processing.dagbag import DagBag
from airflow.sdk.execution_time.comms import (
ConnectionResult,
DeleteVariable,
diff --git a/airflow-core/src/airflow/models/dagbag.py
b/airflow-core/src/airflow/models/dagbag.py
index fc29cf99155..928e8e406d4 100644
--- a/airflow-core/src/airflow/models/dagbag.py
+++ b/airflow-core/src/airflow/models/dagbag.py
@@ -17,660 +17,24 @@
# under the License.
from __future__ import annotations
-import contextlib
import hashlib
-import importlib
-import importlib.machinery
-import importlib.util
-import os
-import signal
-import sys
-import textwrap
-import traceback
-import warnings
-import zipfile
-from datetime import datetime, timedelta
-from pathlib import Path
-from typing import TYPE_CHECKING, NamedTuple
+from typing import TYPE_CHECKING, Any
from sqlalchemy import Column, String, inspect, select
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.attributes import NO_VALUE
-from tabulate import tabulate
-
-from airflow import settings
-from airflow._shared.timezones import timezone
-from airflow.configuration import conf
-from airflow.exceptions import (
- AirflowClusterPolicyError,
- AirflowClusterPolicySkipDag,
- AirflowClusterPolicyViolation,
- AirflowDagDuplicatedIdException,
- AirflowException,
- AirflowTaskTimeout,
- UnknownExecutorException,
-)
-from airflow.executors.executor_loader import ExecutorLoader
-from airflow.listeners.listener import get_listener_manager
+
from airflow.models.base import Base, StringID
from airflow.models.dag_version import DagVersion
-from airflow.serialization.serialized_objects import LazyDeserializedDAG,
SerializedDAG
-from airflow.utils.docs import get_docs_url
-from airflow.utils.file import (
- correct_maybe_zipped,
- get_unique_dag_module_name,
- list_py_file_paths,
- might_contain_dag,
-)
-from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.session import NEW_SESSION, provide_session
-from airflow.utils.types import NOTSET
-
-try:
- from airflow.sdk.exceptions import AirflowDagCycleException
-except ImportError:
- from airflow.exceptions import AirflowDagCycleException # type:
ignore[no-redef]
if TYPE_CHECKING:
from collections.abc import Generator
from sqlalchemy.orm import Session
- from airflow import DAG
from airflow.models import DagRun
- from airflow.models.dagwarning import DagWarning
from airflow.models.serialized_dag import SerializedDagModel
- from airflow.utils.types import ArgNotSet
-
-
[email protected]
-def _capture_with_reraise() -> Generator[list[warnings.WarningMessage], None,
None]:
- """Capture warnings in context and re-raise it on exit from the context
manager."""
- captured_warnings = []
- try:
- with warnings.catch_warnings(record=True) as captured_warnings:
- yield captured_warnings
- finally:
- if captured_warnings:
- for cw in captured_warnings:
- warnings.warn_explicit(
- message=cw.message,
- category=cw.category,
- filename=cw.filename,
- lineno=cw.lineno,
- source=cw.source,
- )
-
-
-class FileLoadStat(NamedTuple):
- """
- Information about single file.
-
- :param file: Loaded file.
- :param duration: Time spent on process file.
- :param dag_num: Total number of DAGs loaded in this file.
- :param task_num: Total number of Tasks loaded in this file.
- :param dags: DAGs names loaded in this file.
- :param warning_num: Total number of warnings captured from processing this
file.
- """
-
- file: str
- duration: timedelta
- dag_num: int
- task_num: int
- dags: str
- warning_num: int
-
-
[email protected]
-def timeout(seconds=1, error_message="Timeout"):
- import logging
-
- log = logging.getLogger(__name__)
- error_message = error_message + ", PID: " + str(os.getpid())
-
- def handle_timeout(signum, frame):
- """Log information and raises AirflowTaskTimeout."""
- log.error("Process timed out, PID: %s", str(os.getpid()))
- raise AirflowTaskTimeout(error_message)
-
- try:
- try:
- signal.signal(signal.SIGALRM, handle_timeout)
- signal.setitimer(signal.ITIMER_REAL, seconds)
- except ValueError:
- log.warning("timeout can't be used in the current context",
exc_info=True)
- yield
- finally:
- with contextlib.suppress(ValueError):
- signal.setitimer(signal.ITIMER_REAL, 0)
-
-
-def _validate_executor_fields(dag: DAG) -> None:
- for task in dag.tasks:
- if not task.executor:
- continue
- try:
- ExecutorLoader.lookup_executor_name_by_str(task.executor)
- except UnknownExecutorException:
- raise UnknownExecutorException(
- f"Task '{task.task_id}' specifies executor '{task.executor}',
which is not available. "
- "Make sure it is listed in your [core] executors
configuration, or update the task's "
- "executor to use one of the configured executors."
- )
-
-
-class DagBag(LoggingMixin):
- """
- A dagbag is a collection of dags, parsed out of a folder tree and has high
level configuration settings.
-
- Some possible setting are database to use as a backend and what executor
- to use to fire off tasks. This makes it easier to run distinct environments
- for say production and development, tests, or for different teams or
security
- profiles. What would have been system level settings are now dagbag level
so
- that one system can run multiple, independent settings sets.
-
- :param dag_folder: the folder to scan to find DAGs
- :param include_examples: whether to include the examples that ship
- with airflow or not
- :param safe_mode: when ``False``, scans all python modules for dags.
- When ``True`` uses heuristics (files containing ``DAG`` and
``airflow`` strings)
- to filter python modules to scan for dags.
- :param load_op_links: Should the extra operator link be loaded via plugins
when
- de-serializing the DAG? This flag is set to False in Scheduler so that
Extra Operator links
- are not loaded to not run User code in Scheduler.
- :param collect_dags: when True, collects dags during class initialization.
- :param known_pools: If not none, then generate warnings if a Task attempts
to use an unknown pool.
- """
-
- def __init__(
- self,
- dag_folder: str | Path | None = None, # todo AIP-66: rename this to
path
- include_examples: bool | ArgNotSet = NOTSET,
- safe_mode: bool | ArgNotSet = NOTSET,
- load_op_links: bool = True,
- collect_dags: bool = True,
- known_pools: set[str] | None = None,
- bundle_path: Path | None = None,
- ):
- super().__init__()
- self.bundle_path = bundle_path
- include_examples = (
- include_examples
- if isinstance(include_examples, bool)
- else conf.getboolean("core", "LOAD_EXAMPLES")
- )
- safe_mode = (
- safe_mode if isinstance(safe_mode, bool) else
conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE")
- )
-
- dag_folder = dag_folder or settings.DAGS_FOLDER
- self.dag_folder = dag_folder
- self.dags: dict[str, DAG] = {}
- # the file's last modified timestamp when we last read it
- self.file_last_changed: dict[str, datetime] = {}
- # Store import errors with relative file paths as keys (relative to
bundle_path)
- self.import_errors: dict[str, str] = {}
- self.captured_warnings: dict[str, tuple[str, ...]] = {}
- self.has_logged = False
- # Only used by SchedulerJob to compare the dag_hash to identify change
in DAGs
- self.dags_hash: dict[str, str] = {}
-
- self.known_pools = known_pools
-
- self.dagbag_import_error_tracebacks = conf.getboolean("core",
"dagbag_import_error_tracebacks")
- self.dagbag_import_error_traceback_depth = conf.getint("core",
"dagbag_import_error_traceback_depth")
- if collect_dags:
- self.collect_dags(
- dag_folder=dag_folder,
- include_examples=include_examples,
- safe_mode=safe_mode,
- )
- # Should the extra operator link be loaded via plugins?
- # This flag is set to False in Scheduler so that Extra Operator links
are not loaded
- self.load_op_links = load_op_links
-
- def size(self) -> int:
- """:return: the amount of dags contained in this dagbag"""
- return len(self.dags)
-
- @property
- def dag_ids(self) -> list[str]:
- """
- Get DAG ids.
-
- :return: a list of DAG IDs in this bag
- """
- return list(self.dags)
-
- @provide_session
- def get_dag(self, dag_id, session: Session = None):
- """
- Get the DAG out of the dictionary, and refreshes it if expired.
-
- :param dag_id: DAG ID
- """
- # Avoid circular import
- from airflow.models.dag import DagModel
-
- dag = self.dags.get(dag_id)
-
- # If DAG Model is absent, we can't check last_expired property. Is the
DAG not yet synchronized?
- if (orm_dag := DagModel.get_current(dag_id, session=session)) is None:
- return dag
-
- is_expired = (
- orm_dag.last_expired and dag and dag.last_loaded and
dag.last_loaded < orm_dag.last_expired
- )
- if is_expired:
- # Remove associated dags so we can re-add them.
- self.dags.pop(dag_id, None)
- if dag is None or is_expired:
- # Reprocess source file.
- found_dags = self.process_file(
- filepath=correct_maybe_zipped(orm_dag.fileloc),
only_if_updated=False
- )
-
- # If the source file no longer exports `dag_id`, delete it from
self.dags
- if found_dags and dag_id in [found_dag.dag_id for found_dag in
found_dags]:
- return self.dags[dag_id]
- self.dags.pop(dag_id, None)
- return self.dags.get(dag_id)
-
- def process_file(self, filepath, only_if_updated=True, safe_mode=True):
- """Given a path to a python module or zip file, import the module and
look for dag objects within."""
- from airflow.sdk.definitions._internal.contextmanager import DagContext
-
- # if the source file no longer exists in the DB or in the filesystem,
- # return an empty list
- # todo: raise exception?
-
- if filepath is None or not os.path.isfile(filepath):
- return []
-
- try:
- # This failed before in what may have been a git sync
- # race condition
- file_last_changed_on_disk =
datetime.fromtimestamp(os.path.getmtime(filepath))
- if (
- only_if_updated
- and filepath in self.file_last_changed
- and file_last_changed_on_disk ==
self.file_last_changed[filepath]
- ):
- return []
- except Exception as e:
- self.log.exception(e)
- return []
-
- # Ensure we don't pick up anything else we didn't mean to
- DagContext.autoregistered_dags.clear()
-
- self.captured_warnings.pop(filepath, None)
- with _capture_with_reraise() as captured_warnings:
- if filepath.endswith(".py") or not zipfile.is_zipfile(filepath):
- mods = self._load_modules_from_file(filepath, safe_mode)
- else:
- mods = self._load_modules_from_zip(filepath, safe_mode)
-
- if captured_warnings:
- formatted_warnings = []
- for msg in captured_warnings:
- category = msg.category.__name__
- if (module := msg.category.__module__) != "builtins":
- category = f"{module}.{category}"
- formatted_warnings.append(f"{msg.filename}:{msg.lineno}:
{category}: {msg.message}")
- self.captured_warnings[filepath] = tuple(formatted_warnings)
-
- found_dags = self._process_modules(filepath, mods,
file_last_changed_on_disk)
-
- self.file_last_changed[filepath] = file_last_changed_on_disk
- return found_dags
-
- @property
- def dag_warnings(self) -> set[DagWarning]:
- """Get the set of DagWarnings for the bagged dags."""
- from airflow.models.dagwarning import DagWarning, DagWarningType
-
- # None means this feature is not enabled. Empty set means we don't
know about any pools at all!
- if self.known_pools is None:
- return set()
-
- def get_pools(dag) -> dict[str, set[str]]:
- return {dag.dag_id: {task.pool for task in dag.tasks}}
-
- pool_dict: dict[str, set[str]] = {}
- for dag in self.dags.values():
- pool_dict.update(get_pools(dag))
-
- warnings: set[DagWarning] = set()
- for dag_id, dag_pools in pool_dict.items():
- nonexistent_pools = dag_pools - self.known_pools
- if nonexistent_pools:
- warnings.add(
- DagWarning(
- dag_id,
- DagWarningType.NONEXISTENT_POOL,
- f"Dag '{dag_id}' references non-existent pools:
{sorted(nonexistent_pools)!r}",
- )
- )
- return warnings
-
- def _get_relative_fileloc(self, filepath: str) -> str:
- """
- Get the relative file location for a given filepath.
-
- :param filepath: Absolute path to the file
- :return: Relative path from bundle_path, or original filepath if no
bundle_path
- """
- if self.bundle_path:
- return str(Path(filepath).relative_to(self.bundle_path))
- return filepath
-
- def _load_modules_from_file(self, filepath, safe_mode):
- from airflow.sdk.definitions._internal.contextmanager import DagContext
-
- def handler(signum, frame):
- """Handle SIGSEGV signal and let the user know that the import
failed."""
- msg = f"Received SIGSEGV signal while processing {filepath}."
- self.log.error(msg)
- relative_filepath = self._get_relative_fileloc(filepath)
- self.import_errors[relative_filepath] = msg
-
- try:
- signal.signal(signal.SIGSEGV, handler)
- except ValueError:
- self.log.warning("SIGSEGV signal handler registration failed. Not
in the main thread")
-
- if not might_contain_dag(filepath, safe_mode):
- # Don't want to spam user with skip messages
- if not self.has_logged:
- self.has_logged = True
- self.log.info("File %s assumed to contain no DAGs. Skipping.",
filepath)
- return []
-
- self.log.debug("Importing %s", filepath)
- mod_name = get_unique_dag_module_name(filepath)
-
- if mod_name in sys.modules:
- del sys.modules[mod_name]
-
- DagContext.current_autoregister_module_name = mod_name
-
- def parse(mod_name, filepath):
- try:
- loader = importlib.machinery.SourceFileLoader(mod_name,
filepath)
- spec = importlib.util.spec_from_loader(mod_name, loader)
- new_module = importlib.util.module_from_spec(spec)
- sys.modules[spec.name] = new_module
- loader.exec_module(new_module)
- return [new_module]
- except KeyboardInterrupt:
- # re-raise ctrl-c
- raise
- except BaseException as e:
- # Normally you shouldn't catch BaseException, but in this case
we want to, as, pytest.skip
- # raises an exception which does not inherit from Exception,
and we want to catch that here.
- # This would also catch `exit()` in a dag file
- DagContext.autoregistered_dags.clear()
- self.log.exception("Failed to import: %s", filepath)
- relative_filepath = self._get_relative_fileloc(filepath)
- if self.dagbag_import_error_tracebacks:
- self.import_errors[relative_filepath] =
traceback.format_exc(
- limit=-self.dagbag_import_error_traceback_depth
- )
- else:
- self.import_errors[relative_filepath] = str(e)
- return []
-
- dagbag_import_timeout = settings.get_dagbag_import_timeout(filepath)
-
- if not isinstance(dagbag_import_timeout, (int, float)):
- raise TypeError(
- f"Value ({dagbag_import_timeout}) from
get_dagbag_import_timeout must be int or float"
- )
-
- if dagbag_import_timeout <= 0: # no parsing timeout
- return parse(mod_name, filepath)
-
- timeout_msg = (
- f"DagBag import timeout for {filepath} after
{dagbag_import_timeout}s.\n"
- "Please take a look at these docs to improve your DAG import
time:\n"
- f"* {get_docs_url('best-practices.html#top-level-python-code')}\n"
- f"* {get_docs_url('best-practices.html#reducing-dag-complexity')}"
- )
- with timeout(dagbag_import_timeout, error_message=timeout_msg):
- return parse(mod_name, filepath)
-
- def _load_modules_from_zip(self, filepath, safe_mode):
- from airflow.sdk.definitions._internal.contextmanager import DagContext
-
- mods = []
- with zipfile.ZipFile(filepath) as current_zip_file:
- for zip_info in current_zip_file.infolist():
- zip_path = Path(zip_info.filename)
- if zip_path.suffix not in [".py", ".pyc"] or
len(zip_path.parts) > 1:
- continue
-
- if zip_path.stem == "__init__":
- self.log.warning("Found %s at root of %s", zip_path.name,
filepath)
-
- self.log.debug("Reading %s from %s", zip_info.filename,
filepath)
-
- if not might_contain_dag(zip_info.filename, safe_mode,
current_zip_file):
- # todo: create ignore list
- # Don't want to spam user with skip messages
- if not self.has_logged:
- self.has_logged = True
- self.log.info(
- "File %s:%s assumed to contain no DAGs.
Skipping.", filepath, zip_info.filename
- )
- continue
-
- mod_name = zip_path.stem
- if mod_name in sys.modules:
- del sys.modules[mod_name]
-
- DagContext.current_autoregister_module_name = mod_name
- try:
- sys.path.insert(0, filepath)
- current_module = importlib.import_module(mod_name)
- mods.append(current_module)
- except Exception as e:
- DagContext.autoregistered_dags.clear()
- fileloc = os.path.join(filepath, zip_info.filename)
- self.log.exception("Failed to import: %s", fileloc)
- relative_fileloc = self._get_relative_fileloc(fileloc)
- if self.dagbag_import_error_tracebacks:
- self.import_errors[relative_fileloc] =
traceback.format_exc(
- limit=-self.dagbag_import_error_traceback_depth
- )
- else:
- self.import_errors[relative_fileloc] = str(e)
- finally:
- if sys.path[0] == filepath:
- del sys.path[0]
- return mods
-
- def _process_modules(self, filepath, mods, file_last_changed_on_disk):
- from airflow.sdk import DAG
- from airflow.sdk.definitions._internal.contextmanager import DagContext
-
- top_level_dags = {(o, m) for m in mods for o in m.__dict__.values() if
isinstance(o, DAG)}
-
- top_level_dags.update(DagContext.autoregistered_dags)
-
- DagContext.current_autoregister_module_name = None
- DagContext.autoregistered_dags.clear()
-
- found_dags = []
-
- for dag, mod in top_level_dags:
- dag.fileloc = mod.__file__
- relative_fileloc = self._get_relative_fileloc(dag.fileloc)
- dag.relative_fileloc = relative_fileloc
- try:
- dag.validate()
- _validate_executor_fields(dag)
- self.bag_dag(dag=dag)
- except AirflowClusterPolicySkipDag:
- pass
- except Exception as e:
- self.log.exception("Failed to bag_dag: %s", dag.fileloc)
- self.import_errors[relative_fileloc] = f"{type(e).__name__}:
{e}"
- self.file_last_changed[dag.fileloc] = file_last_changed_on_disk
- else:
- found_dags.append(dag)
- return found_dags
-
- def bag_dag(self, dag: DAG):
- """
- Add the DAG into the bag.
-
- :raises: AirflowDagCycleException if a cycle is detected.
- :raises: AirflowDagDuplicatedIdException if this dag already exists in
the bag.
- """
- dag.check_cycle() # throws exception if a task cycle is found
-
- dag.resolve_template_files()
- dag.last_loaded = timezone.utcnow()
-
- try:
- # Check policies
- settings.dag_policy(dag)
-
- for task in dag.tasks:
- # The listeners are not supported when ending a task via a
trigger on asynchronous operators.
- if getattr(task, "end_from_trigger", False) and
get_listener_manager().has_listeners:
- raise AirflowException(
- "Listeners are not supported with
end_from_trigger=True for deferrable operators. "
- "Task %s in DAG %s has end_from_trigger=True with
listeners from plugins. "
- "Set end_from_trigger=False to use listeners.",
- task.task_id,
- dag.dag_id,
- )
-
- settings.task_policy(task)
- except (AirflowClusterPolicyViolation, AirflowClusterPolicySkipDag):
- raise
- except Exception as e:
- self.log.exception(e)
- raise AirflowClusterPolicyError(e)
-
- try:
- prev_dag = self.dags.get(dag.dag_id)
- if prev_dag and prev_dag.fileloc != dag.fileloc:
- raise AirflowDagDuplicatedIdException(
- dag_id=dag.dag_id,
- incoming=dag.fileloc,
- existing=self.dags[dag.dag_id].fileloc,
- )
- self.dags[dag.dag_id] = dag
- self.log.debug("Loaded DAG %s", dag)
- except (AirflowDagCycleException, AirflowDagDuplicatedIdException):
- # There was an error in bagging the dag. Remove it from the list
of dags
- self.log.exception("Exception bagging dag: %s", dag.dag_id)
- raise
-
- def collect_dags(
- self,
- dag_folder: str | Path | None = None,
- only_if_updated: bool = True,
- include_examples: bool = conf.getboolean("core", "LOAD_EXAMPLES"),
- safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE"),
- ):
- """
- Look for python modules in a given path, import them, and add them to
the dagbag collection.
-
- Note that if a ``.airflowignore`` file is found while processing
- the directory, it will behave much like a ``.gitignore``,
- ignoring files that match any of the patterns specified
- in the file.
-
- **Note**: The patterns in ``.airflowignore`` are interpreted as either
- un-anchored regexes or gitignore-like glob expressions, depending on
- the ``DAG_IGNORE_FILE_SYNTAX`` configuration parameter.
- """
- self.log.info("Filling up the DagBag from %s", dag_folder)
- dag_folder = dag_folder or self.dag_folder
- # Used to store stats around DagBag processing
- stats = []
-
- # Ensure dag_folder is a str -- it may have been a pathlib.Path
- dag_folder = correct_maybe_zipped(str(dag_folder))
-
- files_to_parse = list_py_file_paths(dag_folder, safe_mode=safe_mode)
-
- if include_examples:
- from airflow import example_dags
-
- example_dag_folder = next(iter(example_dags.__path__))
-
- files_to_parse.extend(list_py_file_paths(example_dag_folder,
safe_mode=safe_mode))
-
- for filepath in files_to_parse:
- try:
- file_parse_start_dttm = timezone.utcnow()
- found_dags = self.process_file(filepath,
only_if_updated=only_if_updated, safe_mode=safe_mode)
-
- file_parse_end_dttm = timezone.utcnow()
- stats.append(
- FileLoadStat(
- file=filepath.replace(settings.DAGS_FOLDER, ""),
- duration=file_parse_end_dttm - file_parse_start_dttm,
- dag_num=len(found_dags),
- task_num=sum(len(dag.tasks) for dag in found_dags),
- dags=str([dag.dag_id for dag in found_dags]),
- warning_num=len(self.captured_warnings.get(filepath,
[])),
- )
- )
- except Exception as e:
- self.log.exception(e)
-
- self.dagbag_stats = sorted(stats, key=lambda x: x.duration,
reverse=True)
-
- def dagbag_report(self):
- """Print a report around DagBag loading stats."""
- stats = self.dagbag_stats
- dag_folder = self.dag_folder
- duration = sum((o.duration for o in stats),
timedelta()).total_seconds()
- dag_num = sum(o.dag_num for o in stats)
- task_num = sum(o.task_num for o in stats)
- table = tabulate(stats, headers="keys")
-
- report = textwrap.dedent(
- f"""\n
- -------------------------------------------------------------------
- DagBag loading stats for {dag_folder}
- -------------------------------------------------------------------
- Number of DAGs: {dag_num}
- Total task number: {task_num}
- DagBag parsing time: {duration}\n{table}
- """
- )
- return report
-
-
-@provide_session
-def sync_bag_to_db(
- dagbag: DagBag,
- bundle_name: str,
- bundle_version: str | None,
- *,
- session: Session = NEW_SESSION,
-) -> None:
- """Save attributes about list of DAG to the DB."""
- from airflow.dag_processing.collection import
update_dag_parsing_results_in_db
-
- import_errors = {(bundle_name, rel_path): error for rel_path, error in
dagbag.import_errors.items()}
- update_dag_parsing_results_in_db(
- bundle_name,
- bundle_version,
- [LazyDeserializedDAG.from_dag(dag) for dag in dagbag.dags.values()],
- import_errors,
- None, # file parsing duration is not well defined when parsing
multiple files / multiple DAGs.
- dagbag.dag_warnings,
- session=session,
- )
+ from airflow.serialization.serialized_objects import SerializedDAG
class DBDagBag:
@@ -767,3 +131,26 @@ class DagPriorityParsingRequest(Base):
def __repr__(self) -> str:
return f"<DagPriorityParsingRequest: bundle_name={self.bundle_name}
relative_fileloc={self.relative_fileloc}>"
+
+
+def __getattr__(name: str) -> Any:
+ """
+ Backwards-compat shim: importing DagBag from airflow.models.dagbag is
deprecated.
+
+ Emits DeprecationWarning and re-exports DagBag from
airflow.dag_processing.dagbag
+ to preserve compatibility for external callers.
+ """
+ if name in {"DagBag", "FileLoadStat", "timeout"}:
+ import warnings
+
+ warnings.warn(
+ f"Importing {name} from airflow.models.dagbag is deprecated and
will be removed in a future "
+ "release. Please import from airflow.dag_processing.dagbag
instead.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ # Import on demand to avoid import-time side effects
+ from airflow.dag_processing import dagbag as _dagbag
+
+ return getattr(_dagbag, name)
+ raise AttributeError(name)
diff --git a/airflow-core/src/airflow/utils/cli.py
b/airflow-core/src/airflow/utils/cli.py
index b6423c5af3a..c96cbc005b1 100644
--- a/airflow-core/src/airflow/utils/cli.py
+++ b/airflow-core/src/airflow/utils/cli.py
@@ -36,6 +36,7 @@ from typing import TYPE_CHECKING, TypeVar, cast
from airflow import settings
from airflow._shared.timezones import timezone
from airflow.dag_processing.bundles.manager import DagBundlesManager
+from airflow.dag_processing.dagbag import DagBag
from airflow.exceptions import AirflowException
from airflow.sdk.definitions._internal.dag_parsing_context import
_airflow_parsing_context_manager
from airflow.utils import cli_action_loggers
@@ -230,7 +231,8 @@ def process_subdir(subdir: str | None):
def get_dag_by_file_location(dag_id: str):
"""Return DAG of a given dag_id by looking up file location."""
# TODO: AIP-66 - investigate more, can we use serdag?
- from airflow.models import DagBag, DagModel
+ from airflow.dag_processing.dagbag import DagBag
+ from airflow.models import DagModel
# Benefit is that logging from other dags in dagbag will not appear
dag_model = DagModel.get_current(dag_id)
@@ -271,7 +273,7 @@ def get_bagged_dag(bundle_names: list | None, dag_id: str,
dagfile_path: str | N
find the correct path (assuming it's a file) and failing that, use the
configured
dags folder.
"""
- from airflow.models.dagbag import DagBag, sync_bag_to_db
+ from airflow.dag_processing.dagbag import sync_bag_to_db
manager = DagBundlesManager()
for bundle_name in bundle_names or ():
@@ -315,7 +317,7 @@ def get_db_dag(bundle_names: list | None, dag_id: str,
dagfile_path: str | None
def get_dags(bundle_names: list | None, dag_id: str, use_regex: bool = False,
from_db: bool = False):
"""Return DAG(s) matching a given regex or dag_id."""
- from airflow.models import DagBag
+ from airflow.dag_processing.dagbag import DagBag
bundle_names = bundle_names or []
diff --git a/airflow-core/tests/integration/otel/test_otel.py
b/airflow-core/tests/integration/otel/test_otel.py
index 4dd9874f736..fab772935f7 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -28,9 +28,10 @@ from sqlalchemy import select
from airflow._shared.timezones import timezone
from airflow.dag_processing.bundles.manager import DagBundlesManager
+from airflow.dag_processing.dagbag import DagBag
from airflow.executors import executor_loader
from airflow.executors.executor_utils import ExecutorName
-from airflow.models import DAG, DagBag, DagRun
+from airflow.models import DAG, DagRun
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance
from airflow.serialization.serialized_objects import SerializedDAG
diff --git a/airflow-core/tests/unit/always/test_example_dags.py
b/airflow-core/tests/unit/always/test_example_dags.py
index 1744a3f88a4..feb62c52b6b 100644
--- a/airflow-core/tests/unit/always/test_example_dags.py
+++ b/airflow-core/tests/unit/always/test_example_dags.py
@@ -27,7 +27,8 @@ import pytest
from packaging.specifiers import SpecifierSet
from packaging.version import Version
-from airflow.models import Connection, DagBag
+from airflow.dag_processing.dagbag import DagBag
+from airflow.models import Connection
from airflow.sdk import BaseHook
from airflow.utils import yaml
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
index ead2710ad84..069208cd683 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
@@ -25,7 +25,8 @@ import pytest
from sqlalchemy import and_, func, select
from airflow._shared.timezones import timezone
-from airflow.models import DagBag, DagModel, DagRun
+from airflow.dag_processing.dagbag import DagBag
+from airflow.models import DagModel, DagRun
from airflow.models.backfill import Backfill, BackfillDagRun,
ReprocessBehavior, _create_backfill
from airflow.models.dag import DAG
from airflow.models.dagbundle import DagBundleModel
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_report.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_report.py
index 3938894001d..b1d37ac3e42 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_report.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_report.py
@@ -95,7 +95,7 @@ class TestDagReportEndpoint:
def _mock_collect_dags(self, *args, **kwargs):
self.dagbag_stats = []
- with patch("airflow.models.dagbag.DagBag.collect_dags",
_mock_collect_dags):
+ with patch("airflow.dag_processing.dagbag.DagBag.collect_dags",
_mock_collect_dags):
response = test_client.get("/dagReports", params={"subdir":
TEST_DAG_FOLDER})
assert response.status_code == 200
assert response.json() == {"dag_reports": [], "total_entries": 0}
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index 0f26235088a..0c06e83392c 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -30,12 +30,12 @@ from sqlalchemy import select
from airflow._shared.timezones.timezone import datetime
from airflow.dag_processing.bundles.manager import DagBundlesManager
+from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db
from airflow.jobs.job import Job
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagRun, Log, TaskInstance
from airflow.models.dag_version import DagVersion
-from airflow.models.dagbag import DagBag, sync_bag_to_db
from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.models.taskmap import TaskMap
diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py
b/airflow-core/tests/unit/cli/commands/test_dag_command.py
index f6cae39b720..10332a37a58 100644
--- a/airflow-core/tests/unit/cli/commands/test_dag_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py
@@ -34,9 +34,10 @@ from airflow import settings
from airflow._shared.timezones import timezone
from airflow.cli import cli_parser
from airflow.cli.commands import dag_command
+from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db
from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun
-from airflow.models.dagbag import DBDagBag, sync_bag_to_db
+from airflow.models import DagModel, DagRun
+from airflow.models.dagbag import DBDagBag
from airflow.models.serialized_dag import SerializedDagModel
from airflow.providers.standard.triggers.temporal import DateTimeTrigger,
TimeDeltaTrigger
from airflow.sdk import BaseOperator, task
@@ -759,7 +760,7 @@ class TestCliDags:
mock_render_dag.assert_has_calls([mock.call(mock_get_dag.return_value,
tis=[])])
assert "SOURCE" in output
- @mock.patch("airflow.models.dagbag.DagBag")
+ @mock.patch("airflow.utils.cli.DagBag")
def test_dag_test_with_bundle_name(self, mock_dagbag,
configure_dag_bundles):
"""Test that DAG can be tested using bundle name."""
mock_dagbag.return_value.get_dag.return_value.test.return_value =
DagRun(
@@ -786,7 +787,7 @@ class TestCliDags:
include_examples=False,
)
- @mock.patch("airflow.models.dagbag.DagBag")
+ @mock.patch("airflow.utils.cli.DagBag")
def test_dag_test_with_dagfile_path(self, mock_dagbag,
configure_dag_bundles):
"""Test that DAG can be tested using dagfile path."""
mock_dagbag.return_value.get_dag.return_value.test.return_value =
DagRun(
@@ -807,7 +808,7 @@ class TestCliDags:
include_examples=False,
)
- @mock.patch("airflow.models.dagbag.DagBag")
+ @mock.patch("airflow.utils.cli.DagBag")
def test_dag_test_with_both_bundle_and_dagfile_path(self, mock_dagbag,
configure_dag_bundles):
"""Test that DAG can be tested using both bundle name and dagfile
path."""
mock_dagbag.return_value.get_dag.return_value.test.return_value =
DagRun(
diff --git a/airflow-core/tests/unit/cli/commands/test_task_command.py
b/airflow-core/tests/unit/cli/commands/test_task_command.py
index 532324d06c6..1c7b227bb6f 100644
--- a/airflow-core/tests/unit/cli/commands/test_task_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_task_command.py
@@ -35,8 +35,9 @@ from airflow._shared.timezones import timezone
from airflow.cli import cli_parser
from airflow.cli.commands import task_command
from airflow.configuration import conf
+from airflow.dag_processing.dagbag import DagBag
from airflow.exceptions import DagRunNotFound
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import DagModel, DagRun, TaskInstance
from airflow.models.dag_version import DagVersion
from airflow.models.dagbag import DBDagBag
from airflow.models.serialized_dag import SerializedDagModel
diff --git a/airflow-core/tests/unit/cli/conftest.py
b/airflow-core/tests/unit/cli/conftest.py
index d65ea767013..9c20c9d7e6a 100644
--- a/airflow-core/tests/unit/cli/conftest.py
+++ b/airflow-core/tests/unit/cli/conftest.py
@@ -21,8 +21,8 @@ import sys
import pytest
+from airflow.dag_processing.dagbag import DagBag
from airflow.executors import local_executor
-from airflow.models.dagbag import DagBag
from airflow.providers.celery.executors import celery_executor
from airflow.providers.cncf.kubernetes.executors import kubernetes_executor
diff --git a/airflow-core/tests/unit/core/test_impersonation_tests.py
b/airflow-core/tests/unit/core/test_impersonation_tests.py
index 62c8effb5b2..c4ca10d71df 100644
--- a/airflow-core/tests/unit/core/test_impersonation_tests.py
+++ b/airflow-core/tests/unit/core/test_impersonation_tests.py
@@ -29,7 +29,8 @@ import pytest
from airflow._shared.timezones.timezone import datetime
from airflow.configuration import conf
-from airflow.models import DagBag, TaskInstance
+from airflow.dag_processing.dagbag import DagBag
+from airflow.models import TaskInstance
from airflow.utils.db import add_default_pool_if_not_exists
from airflow.utils.state import State
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index c9748c5058c..1b023381bf5 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -42,13 +42,14 @@ from uuid6 import uuid7
from airflow._shared.timezones import timezone
from airflow.callbacks.callback_requests import DagCallbackRequest
from airflow.dag_processing.bundles.manager import DagBundlesManager
+from airflow.dag_processing.dagbag import DagBag
from airflow.dag_processing.manager import (
DagFileInfo,
DagFileProcessorManager,
DagFileStat,
)
from airflow.dag_processing.processor import DagFileProcessorProcess
-from airflow.models import DagBag, DagModel, DbCallbackRequest
+from airflow.models import DagModel, DbCallbackRequest
from airflow.models.asset import TaskOutletAssetReference
from airflow.models.dag_version import DagVersion
from airflow.models.dagbundle import DagBundleModel
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 8acba729f05..5d3d6622d69 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -47,6 +47,7 @@ from airflow.callbacks.callback_requests import (
EmailNotificationRequest,
TaskCallbackRequest,
)
+from airflow.dag_processing.dagbag import DagBag
from airflow.dag_processing.manager import process_parse_results
from airflow.dag_processing.processor import (
DagFileParseRequest,
@@ -58,7 +59,7 @@ from airflow.dag_processing.processor import (
_parse_file,
_pre_import_airflow_modules,
)
-from airflow.models import DagBag, DagRun
+from airflow.models import DagRun
from airflow.sdk import DAG, BaseOperator
from airflow.sdk.api.client import Client
from airflow.sdk.api.datamodels._generated import DagRunState
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index c6b1edfed8e..feceac3de32 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -44,6 +44,7 @@ from airflow.assets.manager import AssetManager
from airflow.callbacks.callback_requests import DagCallbackRequest,
DagRunContext, TaskCallbackRequest
from airflow.callbacks.database_callback_sink import DatabaseCallbackSink
from airflow.dag_processing.collection import AssetModelOperation,
DagModelOperation
+from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow.executors.executor_constants import MOCK_EXECUTOR
@@ -55,7 +56,6 @@ from airflow.models.asset import AssetActive,
AssetAliasModel, AssetDagRunQueue,
from airflow.models.backfill import Backfill, _create_backfill
from airflow.models.dag import DagModel, get_last_dagrun,
infer_automated_data_interval
from airflow.models.dag_version import DagVersion
-from airflow.models.dagbag import DagBag, sync_bag_to_db
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning
from airflow.models.db_callback_request import DbCallbackRequest
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index 53cafab3543..ee0445bc869 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -37,8 +37,8 @@ from airflow import settings
from airflow._shared.timezones import timezone
from airflow._shared.timezones.timezone import datetime as datetime_tz
from airflow.configuration import conf
+from airflow.dag_processing.dagbag import DagBag
from airflow.exceptions import AirflowException, ParamValidationError
-from airflow.models import DagBag
from airflow.models.asset import (
AssetAliasModel,
AssetDagRunQueue,
diff --git a/airflow-core/tests/unit/models/test_dagbag.py
b/airflow-core/tests/unit/models/test_dagbag.py
index 943d9f094b5..7ab82622cda 100644
--- a/airflow-core/tests/unit/models/test_dagbag.py
+++ b/airflow-core/tests/unit/models/test_dagbag.py
@@ -35,10 +35,10 @@ import pytest
from sqlalchemy import select
from airflow import settings
+from airflow.dag_processing.dagbag import DagBag, _capture_with_reraise,
_validate_executor_fields
from airflow.exceptions import UnknownExecutorException
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models.dag import DagModel
-from airflow.models.dagbag import DagBag, _capture_with_reraise,
_validate_executor_fields
from airflow.models.dagwarning import DagWarning, DagWarningType
from airflow.models.serialized_dag import SerializedDagModel
from airflow.sdk import DAG, BaseOperator
@@ -101,8 +101,8 @@ class TestDagBag:
"""Test that the timeout context manager raises AirflowTaskTimeout
when time limit is exceeded."""
import time
+ from airflow.dag_processing.dagbag import timeout
from airflow.exceptions import AirflowTaskTimeout
- from airflow.models.dagbag import timeout
with pytest.raises(AirflowTaskTimeout):
with timeout(1, "Test timeout"):
@@ -249,8 +249,8 @@ class TestDagBag:
assert sys.path == syspath_before # sys.path doesn't change
assert not dagbag.import_errors
- @patch("airflow.models.dagbag.timeout")
- @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+ @patch("airflow.dag_processing.dagbag.timeout")
+ @patch("airflow.dag_processing.dagbag.settings.get_dagbag_import_timeout")
def test_process_dag_file_without_timeout(
self, mocked_get_dagbag_import_timeout, mocked_timeout, tmp_path
):
@@ -268,8 +268,8 @@ class TestDagBag:
dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, "test_sensor.py"))
mocked_timeout.assert_not_called()
- @patch("airflow.models.dagbag.timeout")
- @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+ @patch("airflow.dag_processing.dagbag.timeout")
+ @patch("airflow.dag_processing.dagbag.settings.get_dagbag_import_timeout")
def test_process_dag_file_with_non_default_timeout(
self, mocked_get_dagbag_import_timeout, mocked_timeout, tmp_path
):
@@ -287,7 +287,7 @@ class TestDagBag:
mocked_timeout.assert_called_once_with(timeout_value,
error_message=mock.ANY)
- @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+ @patch("airflow.dag_processing.dagbag.settings.get_dagbag_import_timeout")
def test_check_value_type_from_get_dagbag_import_timeout(
self, mocked_get_dagbag_import_timeout, tmp_path
):
@@ -861,7 +861,7 @@ with airflow.DAG(
"""
)
)
- with mock.patch("airflow.models.dagbag.signal.signal") as mock_signal:
+ with mock.patch("airflow.dag_processing.dagbag.signal.signal") as
mock_signal:
mock_signal.side_effect = ValueError("Invalid signal setting")
DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
assert "SIGSEGV signal handler registration failed. Not in the
main thread" in caplog.text
diff --git a/airflow-core/tests/unit/models/test_dagcode.py
b/airflow-core/tests/unit/models/test_dagcode.py
index 14b733e13eb..b98499ca238 100644
--- a/airflow-core/tests/unit/models/test_dagcode.py
+++ b/airflow-core/tests/unit/models/test_dagcode.py
@@ -24,7 +24,7 @@ import pytest
from sqlalchemy.exc import IntegrityError
import airflow.example_dags as example_dags_module
-from airflow.models import DagBag
+from airflow.dag_processing.dagbag import DagBag
from airflow.models.dag_version import DagVersion
from airflow.models.dagcode import DagCode
from airflow.sdk import task as task_decorator
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index 73284f67356..3ec2eccef01 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -77,7 +77,7 @@ async def empty_callback_for_deadline():
@pytest.fixture(scope="module")
def dagbag():
- from airflow.models.dagbag import DagBag
+ from airflow.dag_processing.dagbag import DagBag
return DagBag(include_examples=True)
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py
b/airflow-core/tests/unit/models/test_serialized_dag.py
index ba207cc39ec..e310836a391 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -26,10 +26,10 @@ import pytest
from sqlalchemy import func, select, update
import airflow.example_dags as example_dags_module
+from airflow.dag_processing.dagbag import DagBag
from airflow.models.asset import AssetActive, AssetAliasModel, AssetModel
from airflow.models.dag import DagModel
from airflow.models.dag_version import DagVersion
-from airflow.models.dagbag import DagBag
from airflow.models.serialized_dag import SerializedDagModel as SDM
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 6df1b225350..7fcc987cd46 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -46,6 +46,7 @@ from kubernetes.client import models as k8s
import airflow
from airflow._shared.timezones import timezone
+from airflow.dag_processing.dagbag import DagBag
from airflow.exceptions import (
AirflowException,
ParamValidationError,
@@ -53,7 +54,6 @@ from airflow.exceptions import (
)
from airflow.models.asset import AssetModel
from airflow.models.connection import Connection
-from airflow.models.dagbag import DagBag
from airflow.models.mappedoperator import MappedOperator
from airflow.models.xcom import XCOM_RETURN_KEY, XComModel
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
diff --git a/devel-common/src/tests_common/pytest_plugin.py
b/devel-common/src/tests_common/pytest_plugin.py
index a50dba08921..b7c074388b4 100644
--- a/devel-common/src/tests_common/pytest_plugin.py
+++ b/devel-common/src/tests_common/pytest_plugin.py
@@ -1175,7 +1175,7 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
def sync_dagbag_to_db(self):
if AIRFLOW_V_3_1_PLUS:
- from airflow.models.dagbag import sync_bag_to_db
+ from airflow.dag_processing.dagbag import sync_bag_to_db
sync_bag_to_db(self.dagbag, self.bundle_name, None)
elif AIRFLOW_V_3_0_PLUS:
@@ -1622,7 +1622,13 @@ def session():
def get_test_dag():
def _get(dag_id: str):
from airflow import settings
- from airflow.models.dagbag import DagBag
+
+ from tests_common.test_utils.version_compat import AIRFLOW_V_3_1_PLUS
+
+ if AIRFLOW_V_3_1_PLUS:
+ from airflow.dag_processing.dagbag import DagBag
+ else:
+ from airflow.models.dagbag import DagBag # type: ignore[no-redef,
attribute-defined]
from airflow.models.serialized_dag import SerializedDagModel
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
diff --git a/devel-common/src/tests_common/test_utils/db.py
b/devel-common/src/tests_common/test_utils/db.py
index 9e8f729a9f8..c9d964641e3 100644
--- a/devel-common/src/tests_common/test_utils/db.py
+++ b/devel-common/src/tests_common/test_utils/db.py
@@ -90,7 +90,10 @@ def _deactivate_unknown_dags(active_dag_ids, session):
def _bootstrap_dagbag():
- from airflow.models.dagbag import DagBag
+ if AIRFLOW_V_3_1_PLUS:
+ from airflow.dag_processing.dagbag import DagBag
+ else: # back-compat for Airflow <3.1
+ from airflow.models.dagbag import DagBag # type: ignore[no-redef,
attribute-defined]
if AIRFLOW_V_3_0_PLUS:
from airflow.dag_processing.bundles.manager import DagBundlesManager
@@ -103,7 +106,7 @@ def _bootstrap_dagbag():
dagbag = DagBag()
# Save DAGs in the ORM
if AIRFLOW_V_3_1_PLUS:
- from airflow.models.dagbag import sync_bag_to_db
+ from airflow.dag_processing.dagbag import sync_bag_to_db
sync_bag_to_db(dagbag, bundle_name="dags-folder",
bundle_version=None, session=session)
elif AIRFLOW_V_3_0_PLUS:
@@ -163,7 +166,10 @@ def initial_db_init():
def parse_and_sync_to_db(folder: Path | str, include_examples: bool = False):
- from airflow.models.dagbag import DagBag
+ if AIRFLOW_V_3_1_PLUS:
+ from airflow.dag_processing.dagbag import DagBag
+ else:
+ from airflow.models.dagbag import DagBag # type: ignore[no-redef,
attribute-defined]
if AIRFLOW_V_3_0_PLUS:
from airflow.dag_processing.bundles.manager import DagBundlesManager
@@ -175,7 +181,7 @@ def parse_and_sync_to_db(folder: Path | str,
include_examples: bool = False):
dagbag = DagBag(dag_folder=folder, include_examples=include_examples)
if AIRFLOW_V_3_1_PLUS:
- from airflow.models.dagbag import sync_bag_to_db
+ from airflow.dag_processing.dagbag import sync_bag_to_db
sync_bag_to_db(dagbag, "dags-folder", None, session=session)
elif AIRFLOW_V_3_0_PLUS:
diff --git
a/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
b/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
index 488d57b66c4..193092322c3 100644
---
a/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
+++
b/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
@@ -127,7 +127,7 @@ if AIRFLOW_V_3_1_PLUS:
with create_session() as session:
yield from DBDagBag().iter_all_latest_version_dags(session=session)
else:
- from airflow.models.dagbag import DagBag
+ from airflow.models.dagbag import DagBag # type: ignore[attr-defined,
no-redef]
def _iter_dags() -> Iterable[DAG | SerializedDAG]:
dagbag = DagBag(read_dags_from_db=True) # type: ignore[call-arg]
diff --git a/providers/fab/src/airflow/providers/fab/www/airflow_flask_app.py
b/providers/fab/src/airflow/providers/fab/www/airflow_flask_app.py
index 8db3a1eb903..56c9f830cf3 100644
--- a/providers/fab/src/airflow/providers/fab/www/airflow_flask_app.py
+++ b/providers/fab/src/airflow/providers/fab/www/airflow_flask_app.py
@@ -21,7 +21,12 @@ from typing import TYPE_CHECKING, Any
from flask import Flask
if TYPE_CHECKING:
- from airflow.models.dagbag import DagBag
+ from airflow.providers.fab.version_compat import AIRFLOW_V_3_1_PLUS
+
+ if AIRFLOW_V_3_1_PLUS:
+ from airflow.models.dagbag import DBDagBag as DagBag
+ else:
+ from airflow.models.dagbag import DagBag # type: ignore[no-redef]
class AirflowApp(Flask):
diff --git
a/providers/fab/src/airflow/providers/fab/www/static/dist/airflowDefaultTheme.ff5a35f322070b094aa2.css
b/providers/fab/src/airflow/providers/fab/www/static/dist/airflowDefaultTheme.ff5a35f322070b094aa2.css
index 0e29eda7dac..dc37b46e63f 100644
---
a/providers/fab/src/airflow/providers/fab/www/static/dist/airflowDefaultTheme.ff5a35f322070b094aa2.css
+++
b/providers/fab/src/airflow/providers/fab/www/static/dist/airflowDefaultTheme.ff5a35f322070b094aa2.css
@@ -30,4 +30,4 @@
/*! normalize.css v3.0.2 | MIT License | git.io/normalize
*/html{font-family:sans-serif;-ms-text-size-adjust:100%;-webkit-text-size-adjust:100%}html[data-color-scheme=dark]{filter:invert(100%)
hue-rotate(180deg) saturate(90%)
contrast(85%)}#dark_icon,#light_icon{display:none}html[data-color-scheme=dark]
#dark_icon{display:block}html[data-color-scheme=dark]
#light_icon,html[data-color-scheme=light]
#dark_icon{display:none}html[data-color-scheme=light]
#light_icon{display:block}body{margin [...]
-/*! Source:
https://github.com/h5bp/html5-boilerplate/blob/master/src/css/main.css */@media
print{*,:after,:before{background:#0000!important;box-shadow:none!important;color:#000!important;text-shadow:none!important}a,a:visited{text-decoration:underline}a[href]:after{content:"
(" attr(href) ")"}abbr[title]:after{content:" (" attr(title)
")"}a[href^="#"]:after,a[href^="javascript:"]:after{content:""}blockquote,pre{page-break-inside:avoid}thead{display:table-header-group}img,tr{page-break-
[...]
\ No newline at end of file
+/*! Source:
https://github.com/h5bp/html5-boilerplate/blob/master/src/css/main.css */@media
print{*,:after,:before{background:#0000!important;box-shadow:none!important;color:#000!important;text-shadow:none!important}a,a:visited{text-decoration:underline}a[href]:after{content:"
(" attr(href) ")"}abbr[title]:after{content:" (" attr(title)
")"}a[href^="#"]:after,a[href^="javascript:"]:after{content:""}blockquote,pre{page-break-inside:avoid}thead{display:table-header-group}img,tr{page-break-
[...]
\ No newline at end of file
diff --git
a/providers/fab/src/airflow/providers/fab/www/static/dist/materialIcons.3e67dd6fbfcc4f3b5105.css
b/providers/fab/src/airflow/providers/fab/www/static/dist/materialIcons.3e67dd6fbfcc4f3b5105.css
index c10472d9cd0..e4b7e396d3c 100644
---
a/providers/fab/src/airflow/providers/fab/www/static/dist/materialIcons.3e67dd6fbfcc4f3b5105.css
+++
b/providers/fab/src/airflow/providers/fab/www/static/dist/materialIcons.3e67dd6fbfcc4f3b5105.css
@@ -15,4 +15,4 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- */@font-face{font-family:Material
Icons;font-style:normal;font-weight:400;src:url(data:font/woff2;base64,d09GMgABAAAAAUI8AA4AAAADjOAAAUHkAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGhwbEByCzyoGYACwGhEICorJeIiAFAuwWAABNgIkA5gwBCAFgnoHIFuvznIDpyBryeWaoZ2jbV/h8F/QPjQVyrZdLHXexkto0GNMDdumQfSgOyhSwX2lL/v//89PKjJm20uyrrBtgAcBf70XMBlJPMc4ThIlIFVaT46i3HvL0aKMLvNMab5I12Ve1V2SfJsb7np3ScN7mPsRax9dxfBqO6Fz6tPkk0QcgdrGiFygr7T3/dJeUfRMt8IuknUFvw4YHYSVSIvQCSiexI3nU/9ARmj+Cg1SQI1DUV+UvEIe7pLu8yPlrKtRXgomrPXYSnkbMb
[...]
\ No newline at end of file
+ */@font-face{font-family:Material
Icons;font-style:normal;font-weight:400;src:url(data:font/woff2;base64,d09GMgABAAAAAUI8AA4AAAADjOAAAUHkAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGhwbEByCzyoGYACwGhEICorJeIiAFAuwWAABNgIkA5gwBCAFgnoHIFuvznIDpyBryeWaoZ2jbV/h8F/QPjQVyrZdLHXexkto0GNMDdumQfSgOyhSwX2lL/v//89PKjJm20uyrrBtgAcBf70XMBlJPMc4ThIlIFVaT46i3HvL0aKMLvNMab5I12Ve1V2SfJsb7np3ScN7mPsRax9dxfBqO6Fz6tPkk0QcgdrGiFygr7T3/dJeUfRMt8IuknUFvw4YHYSVSIvQCSiexI3nU/9ARmj+Cg1SQI1DUV+UvEIe7pLu8yPlrKtRXgomrPXYSnkbMb
[...]
\ No newline at end of file
diff --git a/providers/fab/www-hash.txt b/providers/fab/www-hash.txt
index 2332f6b6405..19fcf7ec24b 100644
--- a/providers/fab/www-hash.txt
+++ b/providers/fab/www-hash.txt
@@ -1 +1 @@
-7a4ff69952843dfb3a2b46f1853ccfc797ae551aeed4ed5a30d96958ec476277
+86677c91d374f984777c7270fc1f43e65146f72ca63820ce68ec261f38afa8ff
diff --git
a/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
b/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
index f0706667455..fb84004e7ee 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
@@ -28,7 +28,7 @@ import pytest
from airflow.jobs.job import Job
from airflow.listeners.listener import get_listener_manager
-from airflow.models import DagBag, TaskInstance
+from airflow.models import TaskInstance
from airflow.providers.google.cloud.openlineage.utils import
get_from_nullable_chain
from airflow.providers.openlineage.plugins.listener import OpenLineageListener
from airflow.utils import timezone
@@ -37,7 +37,12 @@ from airflow.utils.types import DagRunType
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_runs
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_1_PLUS
+
+if AIRFLOW_V_3_1_PLUS:
+ from airflow.dag_processing.dagbag import DagBag
+else:
+ from airflow.models.dagbag import DagBag # type: ignore[attr-defined,
no-redef]
TEST_DAG_FOLDER = os.path.join(os.path.dirname(os.path.dirname(__file__)),
"dags")
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
diff --git
a/providers/standard/src/airflow/providers/standard/sensors/external_task.py
b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
index ffff3f85517..8b35d0ad665 100644
--- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py
+++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
@@ -25,7 +25,6 @@ from typing import TYPE_CHECKING, Any, ClassVar
from airflow.configuration import conf
from airflow.exceptions import AirflowSkipException
from airflow.models.dag import DagModel
-from airflow.models.dagbag import DagBag
from airflow.providers.standard.exceptions import (
DuplicateStateError,
ExternalDagDeletedError,
@@ -41,6 +40,7 @@ from airflow.providers.standard.triggers.external_task import
WorkflowTrigger
from airflow.providers.standard.utils.sensor_helper import _get_count,
_get_external_task_group_task_ids
from airflow.providers.standard.version_compat import (
AIRFLOW_V_3_0_PLUS,
+ AIRFLOW_V_3_1_PLUS,
BaseOperator,
BaseOperatorLink,
BaseSensorOperator,
@@ -51,6 +51,11 @@ from airflow.utils.state import State, TaskInstanceState
if not AIRFLOW_V_3_0_PLUS:
from airflow.utils.session import NEW_SESSION, provide_session
+if AIRFLOW_V_3_1_PLUS:
+ from airflow.dag_processing.dagbag import DagBag
+else:
+ from airflow.models.dagbag import DagBag # type: ignore[attr-defined,
no-redef]
+
if TYPE_CHECKING:
from sqlalchemy.orm import Session
diff --git
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
index db8c1c01cd0..a1897f5ffbf 100644
---
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
+++
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
@@ -27,7 +27,7 @@ import pytest
from airflow import settings
from airflow.exceptions import AirflowException, AirflowSensorTimeout,
AirflowSkipException, TaskDeferred
-from airflow.models import DagBag, DagRun, TaskInstance
+from airflow.models import DagRun, TaskInstance
from airflow.models.dag import DAG
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.xcom_arg import XComArg
@@ -67,9 +67,11 @@ else:
from airflow.models import BaseOperator # type:
ignore[assignment,no-redef]
if AIRFLOW_V_3_1_PLUS:
+ from airflow.dag_processing.dagbag import DagBag
from airflow.sdk import TaskGroup
from airflow.sdk.timezone import coerce_datetime, datetime
else:
+ from airflow.models.dagbag import DagBag # type: ignore[attr-defined,
no-redef]
from airflow.utils.task_group import TaskGroup # type: ignore[no-redef]
from airflow.utils.timezone import coerce_datetime, datetime # type:
ignore[attr-defined,no-redef]
diff --git a/providers/standard/tests/unit/standard/sensors/test_time_delta.py
b/providers/standard/tests/unit/standard/sensors/test_time_delta.py
index 4c2ad8129b6..629783021c1 100644
--- a/providers/standard/tests/unit/standard/sensors/test_time_delta.py
+++ b/providers/standard/tests/unit/standard/sensors/test_time_delta.py
@@ -25,7 +25,6 @@ import pytest
import time_machine
from airflow.exceptions import AirflowProviderDeprecationWarning, TaskDeferred
-from airflow.models import DagBag
from airflow.models.dag import DAG
from airflow.providers.standard.sensors.time_delta import (
TimeDeltaSensor,
@@ -36,7 +35,12 @@ from airflow.providers.standard.triggers.temporal import
DateTimeTrigger
from airflow.utils.types import DagRunType
from tests_common.test_utils import db
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, timezone
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_1_PLUS, timezone
+
+if AIRFLOW_V_3_1_PLUS:
+ from airflow.dag_processing.dagbag import DagBag
+else:
+ from airflow.models.dagbag import DagBag # type: ignore[attr-defined,
no-redef]
pytestmark = pytest.mark.db_test
diff --git a/providers/standard/tests/unit/standard/sensors/test_weekday.py
b/providers/standard/tests/unit/standard/sensors/test_weekday.py
index 3f7fa41ea26..d789a085061 100644
--- a/providers/standard/tests/unit/standard/sensors/test_weekday.py
+++ b/providers/standard/tests/unit/standard/sensors/test_weekday.py
@@ -22,13 +22,17 @@ from datetime import timedelta
import pytest
from airflow.exceptions import AirflowSensorTimeout
-from airflow.models import DagBag
from airflow.models.dag import DAG
from airflow.providers.standard.sensors.weekday import DayOfWeekSensor
from airflow.providers.standard.utils.weekday import WeekDay
from tests_common.test_utils import db
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, timezone
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_1_PLUS, timezone
+
+if AIRFLOW_V_3_1_PLUS:
+ from airflow.dag_processing.dagbag import DagBag
+else:
+ from airflow.models import DagBag # type: ignore[attr-defined, no-redef]
pytestmark = pytest.mark.db_test
diff --git a/providers/standard/tests/unit/standard/utils/test_sensor_helper.py
b/providers/standard/tests/unit/standard/utils/test_sensor_helper.py
index 156d2958a54..9265b292fcc 100644
--- a/providers/standard/tests/unit/standard/utils/test_sensor_helper.py
+++ b/providers/standard/tests/unit/standard/utils/test_sensor_helper.py
@@ -25,8 +25,7 @@ from unittest import mock
import pendulum
import pytest
-from airflow.models import DAG, TaskInstance
-from airflow.models.dagbag import DagBag
+from airflow.models import DAG, DagBag, TaskInstance
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.utils.sensor_helper import (
_count_stmt,
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 409982d1a6b..85eb7d514e2 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -602,8 +602,7 @@ def _xcom_push_to_db(ti: RuntimeTaskInstance, key: str,
value: Any) -> None:
def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
# TODO: Task-SDK:
# Using DagBag here is about 98% wrong, but it'll do for now
-
- from airflow.models.dagbag import DagBag
+ from airflow.dag_processing.dagbag import DagBag
bundle_info = what.bundle_info
bundle_instance = DagBundlesManager().get_bundle(