This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 12b8e4c5550 Move DagBag to airflow/dag_processing (#55139)
12b8e4c5550 is described below

commit 12b8e4c555091cd46adf1f8f1e642dc0642096d9
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Wed Sep 24 13:09:26 2025 +0100

    Move DagBag to airflow/dag_processing (#55139)
    
    * Move DagBag to airflow/dag_processing
    
    DagBag is not a DB model, it’s the parser/collector used by the
    dag processor. This change moves it to a more natural home under
    airflow/dag_processing, reduces confusion between “DBDagBag” and "DagBag",
     and helps avoid import tangles.
    
    What changed:
    New module: airflow.dag_processing.dagbag now hosts:
    DagBag, _capture_with_reraise, FileLoadStat, timeout, sync_bag_to_db
    
    Old location airflow.models.dagbag:
    Trimmed to DB-facing helpers (DBDagBag, etc.)
    
    Adds __getattr__ shim for backward compatibility.
    
    Updated imports across the codebase
    
    Pre-commit path allowlist updated to include the new file and remove
    the old path.
    
    Deprecation & migration
    
    Deprecated: from airflow.models.dagbag import DagBag
    
    Use instead: from airflow.dag_processing.dagbag import DagBag
    
    A deprecation warning is emitted via the shim; no functional behavior
    change intended.
    
    Notes
    
    No runtime logic changes to parsing or DAG discovery.
    
    Tests and CLI code updated to the new import path.
    
    sync_bag_to_db moved alongside DagBag to keep parsing + persistence
    in one place.
    
    * fixup! Move DagBag to airflow/dag_processing
    
    * fixup! fixup! Move DagBag to airflow/dag_processing
    
    * fixup! fixup! fixup! Move DagBag to airflow/dag_processing
    
    * fixup! fixup! fixup! fixup! Move DagBag to airflow/dag_processing
    
    * fixup! fixup! fixup! fixup! fixup! Move DagBag to airflow/dag_processing
    
    * fixup! fixup! fixup! fixup! fixup! fixup! Move DagBag to 
airflow/dag_processing
    
    * fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! Move DagBag to 
airflow/dag_processing
    
    * Remove get_dagbag method from DagBundlesManager
    
    * Move DagBag to airflow/dag_processing
    
    DagBag is not a DB model, it’s the parser/collector used by the
    dag processor. This change moves it to a more natural home under
    airflow/dag_processing, reduces confusion between “DBDagBag” and "DagBag",
     and helps avoid import tangles.
    
    What changed:
    New module: airflow.dag_processing.dagbag now hosts:
    DagBag, _capture_with_reraise, FileLoadStat, timeout, sync_bag_to_db
    
    Old location airflow.models.dagbag:
    Trimmed to DB-facing helpers (DBDagBag, etc.)
    
    Adds __getattr__ shim for backward compatibility.
    
    Updated imports across the codebase
    
    Pre-commit path allowlist updated to include the new file and remove
    the old path.
    
    Deprecation & migration
    
    Deprecated: from airflow.models.dagbag import DagBag
    
    Use instead: from airflow.dag_processing.dagbag import DagBag
    
    A deprecation warning is emitted via the shim; no functional behavior
    change intended.
    
    Notes
    
    No runtime logic changes to parsing or DAG discovery.
    
    Tests and CLI code updated to the new import path.
    
    sync_bag_to_db moved alongside DagBag to keep parsing + persistence
    in one place.
    
    * fixup! fixup! fixup! fixup! fixup! fixup! fixup! Move DagBag to 
airflow/dag_processing
    
    * Remove get_dagbag method from DagBundlesManager
    
    * Fix dagbag mock
    
    * import DagBag from models instead of models.dagbag in providers
    
    * fixup! import DagBag from models instead of models.dagbag in providers
    
    * fix fab www-hash
    
    * fixup! fix fab www-hash
    
    * leave DagBag import to be from airflow.models.dagbag in init so it issues 
deprecation warning
    
    * Use DBDagBag in fab type checking if in AF3.1+
    
    * fix static checks
    
    * Update fab hash
    
    * fixup! Update fab hash
    
    * fixup! fixup! Update fab hash
    
    * fixup! fixup! fixup! Update fab hash
---
 .pre-commit-config.yaml                            |   2 +-
 airflow-core/docs/best-practices.rst               |   2 +-
 .../core_api/routes/public/dag_report.py           |   2 +-
 .../src/airflow/cli/commands/dag_command.py        |   7 +-
 .../airflow/{models => dag_processing}/dagbag.py   | 112 +---
 .../src/airflow/dag_processing/processor.py        |   2 +-
 airflow-core/src/airflow/models/dagbag.py          | 665 +--------------------
 airflow-core/src/airflow/utils/cli.py              |   8 +-
 airflow-core/tests/integration/otel/test_otel.py   |   3 +-
 .../tests/unit/always/test_example_dags.py         |   3 +-
 .../core_api/routes/public/test_backfills.py       |   3 +-
 .../core_api/routes/public/test_dag_report.py      |   2 +-
 .../core_api/routes/public/test_task_instances.py  |   2 +-
 .../tests/unit/cli/commands/test_dag_command.py    |  11 +-
 .../tests/unit/cli/commands/test_task_command.py   |   3 +-
 airflow-core/tests/unit/cli/conftest.py            |   2 +-
 .../tests/unit/core/test_impersonation_tests.py    |   3 +-
 .../tests/unit/dag_processing/test_manager.py      |   3 +-
 .../tests/unit/dag_processing/test_processor.py    |   3 +-
 airflow-core/tests/unit/jobs/test_scheduler_job.py |   2 +-
 airflow-core/tests/unit/models/test_dag.py         |   2 +-
 airflow-core/tests/unit/models/test_dagbag.py      |  16 +-
 airflow-core/tests/unit/models/test_dagcode.py     |   2 +-
 airflow-core/tests/unit/models/test_dagrun.py      |   2 +-
 .../tests/unit/models/test_serialized_dag.py       |   2 +-
 .../unit/serialization/test_dag_serialization.py   |   2 +-
 devel-common/src/tests_common/pytest_plugin.py     |  10 +-
 devel-common/src/tests_common/test_utils/db.py     |  14 +-
 .../fab/auth_manager/security_manager/override.py  |   2 +-
 .../airflow/providers/fab/www/airflow_flask_app.py |   7 +-
 .../airflowDefaultTheme.ff5a35f322070b094aa2.css   |   2 +-
 .../dist/materialIcons.3e67dd6fbfcc4f3b5105.css    |   2 +-
 providers/fab/www-hash.txt                         |   2 +-
 .../unit/openlineage/plugins/test_execution.py     |   9 +-
 .../providers/standard/sensors/external_task.py    |   7 +-
 .../standard/sensors/test_external_task_sensor.py  |   4 +-
 .../tests/unit/standard/sensors/test_time_delta.py |   8 +-
 .../tests/unit/standard/sensors/test_weekday.py    |   8 +-
 .../unit/standard/utils/test_sensor_helper.py      |   3 +-
 .../src/airflow/sdk/execution_time/task_runner.py  |   3 +-
 40 files changed, 136 insertions(+), 811 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 91bfd6c3203..f4d06e216e1 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -1580,6 +1580,7 @@ repos:
           ^airflow-core/src/airflow/dag_processing/collection\.py$|
           ^airflow-core/src/airflow/dag_processing/manager\.py$|
           ^airflow-core/src/airflow/dag_processing/processor\.py$|
+          ^airflow-core/src/airflow/dag_processing/dagbag\.py$|
           ^airflow-core/src/airflow/datasets/metadata\.py$|
           ^airflow-core/src/airflow/exceptions\.py$|
           ^airflow-core/src/airflow/executors/local_executor\.py$|
@@ -1593,7 +1594,6 @@ repos:
           ^airflow-core/src/airflow/models/baseoperator\.py$|
           ^airflow-core/src/airflow/models/connection\.py$|
           ^airflow-core/src/airflow/models/dag\.py$|
-          ^airflow-core/src/airflow/models/dagbag\.py$|
           ^airflow-core/src/airflow/models/dagrun\.py$|
           ^airflow-core/src/airflow/models/deadline\.py$|
           ^airflow-core/src/airflow/models/expandinput\.py$|
diff --git a/airflow-core/docs/best-practices.rst 
b/airflow-core/docs/best-practices.rst
index a71b86aa00d..2145a453197 100644
--- a/airflow-core/docs/best-practices.rst
+++ b/airflow-core/docs/best-practices.rst
@@ -727,7 +727,7 @@ Unit tests ensure that there is no incorrect code in your 
Dag. You can write uni
 
     import pytest
 
-    from airflow.models import DagBag
+    from airflow.dag_processing.dagbag import DagBag
 
 
     @pytest.fixture()
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_report.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_report.py
index 42539015b53..d709564923b 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_report.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_report.py
@@ -34,7 +34,7 @@ from airflow.api_fastapi.core_api.security import (
     ReadableDagsFilterDep,
     requires_access_dag,
 )
-from airflow.models.dagbag import DagBag
+from airflow.dag_processing.dagbag import DagBag
 
 dag_report_router = AirflowRouter(tags=["DagReport"], prefix="/dagReports")
 
diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py 
b/airflow-core/src/airflow/cli/commands/dag_command.py
index df7276d8643..135d5991eb8 100644
--- a/airflow-core/src/airflow/cli/commands/dag_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_command.py
@@ -37,11 +37,11 @@ from airflow.api_fastapi.core_api.datamodels.dags import 
DAGResponse
 from airflow.cli.simple_table import AirflowConsole
 from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
 from airflow.dag_processing.bundles.manager import DagBundlesManager
+from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db
 from airflow.exceptions import AirflowConfigException, AirflowException
 from airflow.jobs.job import Job
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import DagModel, DagRun, TaskInstance
 from airflow.models.dag import get_next_data_interval
-from airflow.models.dagbag import sync_bag_to_db
 from airflow.models.errors import ParseImportError
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.utils import cli as cli_utils
@@ -364,7 +364,7 @@ def dag_list_dags(args, session: Session = NEW_SESSION) -> 
None:
     dagbag_import_errors = 0
     dags_list = []
     if args.local:
-        from airflow.models.dagbag import DagBag
+        from airflow.dag_processing.dagbag import DagBag
 
         # Get import errors from the local area
         if args.bundle_name:
@@ -463,6 +463,7 @@ def dag_list_import_errors(args, session: Session = 
NEW_SESSION) -> None:
 
     if args.local:
         # Get import errors from local areas
+
         if args.bundle_name:
             manager = DagBundlesManager()
             validate_dag_bundle_arg(args.bundle_name)
diff --git a/airflow-core/src/airflow/models/dagbag.py 
b/airflow-core/src/airflow/dag_processing/dagbag.py
similarity index 84%
copy from airflow-core/src/airflow/models/dagbag.py
copy to airflow-core/src/airflow/dag_processing/dagbag.py
index fc29cf99155..ec6f739ac19 100644
--- a/airflow-core/src/airflow/models/dagbag.py
+++ b/airflow-core/src/airflow/dag_processing/dagbag.py
@@ -18,7 +18,6 @@
 from __future__ import annotations
 
 import contextlib
-import hashlib
 import importlib
 import importlib.machinery
 import importlib.util
@@ -33,9 +32,6 @@ from datetime import datetime, timedelta
 from pathlib import Path
 from typing import TYPE_CHECKING, NamedTuple
 
-from sqlalchemy import Column, String, inspect, select
-from sqlalchemy.orm import joinedload
-from sqlalchemy.orm.attributes import NO_VALUE
 from tabulate import tabulate
 
 from airflow import settings
@@ -45,6 +41,7 @@ from airflow.exceptions import (
     AirflowClusterPolicyError,
     AirflowClusterPolicySkipDag,
     AirflowClusterPolicyViolation,
+    AirflowDagCycleException,
     AirflowDagDuplicatedIdException,
     AirflowException,
     AirflowTaskTimeout,
@@ -52,9 +49,7 @@ from airflow.exceptions import (
 )
 from airflow.executors.executor_loader import ExecutorLoader
 from airflow.listeners.listener import get_listener_manager
-from airflow.models.base import Base, StringID
-from airflow.models.dag_version import DagVersion
-from airflow.serialization.serialized_objects import LazyDeserializedDAG, 
SerializedDAG
+from airflow.serialization.serialized_objects import LazyDeserializedDAG
 from airflow.utils.docs import get_docs_url
 from airflow.utils.file import (
     correct_maybe_zipped,
@@ -66,20 +61,13 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.types import NOTSET
 
-try:
-    from airflow.sdk.exceptions import AirflowDagCycleException
-except ImportError:
-    from airflow.exceptions import AirflowDagCycleException  # type: 
ignore[no-redef]
-
 if TYPE_CHECKING:
     from collections.abc import Generator
 
     from sqlalchemy.orm import Session
 
     from airflow import DAG
-    from airflow.models import DagRun
     from airflow.models.dagwarning import DagWarning
-    from airflow.models.serialized_dag import SerializedDagModel
     from airflow.utils.types import ArgNotSet
 
 
@@ -671,99 +659,3 @@ def sync_bag_to_db(
         dagbag.dag_warnings,
         session=session,
     )
-
-
-class DBDagBag:
-    """
-    Internal class for retrieving and caching dags in the scheduler.
-
-    :meta private:
-    """
-
-    def __init__(self, load_op_links: bool = True) -> None:
-        self._dags: dict[str, SerializedDAG] = {}  # dag_version_id to dag
-        self.load_op_links = load_op_links
-
-    def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
-        serdag.load_op_links = self.load_op_links
-        if dag := serdag.dag:
-            self._dags[serdag.dag_version_id] = dag
-        return dag
-
-    def _get_dag(self, version_id: str, session: Session) -> SerializedDAG | 
None:
-        if dag := self._dags.get(version_id):
-            return dag
-        dag_version = session.get(DagVersion, version_id, 
options=[joinedload(DagVersion.serialized_dag)])
-        if not dag_version:
-            return None
-        if not (serdag := dag_version.serialized_dag):
-            return None
-        return self._read_dag(serdag)
-
-    @staticmethod
-    def _version_from_dag_run(dag_run: DagRun, *, session: Session) -> 
DagVersion:
-        if not dag_run.bundle_version:
-            if dag_version := 
DagVersion.get_latest_version(dag_id=dag_run.dag_id, session=session):
-                return dag_version
-
-        # Check if created_dag_version relationship is already loaded to avoid 
DetachedInstanceError
-        info = inspect(dag_run)
-        if info.attrs.created_dag_version.loaded_value is not NO_VALUE:
-            # Relationship is already loaded, safe to access
-            return dag_run.created_dag_version
-
-        # Relationship not loaded, fetch it explicitly from current session
-        return session.get(DagVersion, dag_run.created_dag_version_id)
-
-    def get_dag_for_run(self, dag_run: DagRun, session: Session) -> 
SerializedDAG | None:
-        if version := self._version_from_dag_run(dag_run=dag_run, 
session=session):
-            return self._get_dag(version_id=version.id, session=session)
-        return None
-
-    def iter_all_latest_version_dags(self, *, session: Session) -> 
Generator[SerializedDAG, None, None]:
-        """Walk through all latest version dags available in the database."""
-        from airflow.models.serialized_dag import SerializedDagModel
-
-        for sdm in session.scalars(select(SerializedDagModel)):
-            if dag := self._read_dag(sdm):
-                yield dag
-
-    def get_latest_version_of_dag(self, dag_id: str, *, session: Session) -> 
SerializedDAG | None:
-        """Get the latest version of a dag by its id."""
-        from airflow.models.serialized_dag import SerializedDagModel
-
-        if not (serdag := SerializedDagModel.get(dag_id, session=session)):
-            return None
-        return self._read_dag(serdag)
-
-
-def generate_md5_hash(context):
-    bundle_name = context.get_current_parameters()["bundle_name"]
-    relative_fileloc = context.get_current_parameters()["relative_fileloc"]
-    return 
hashlib.md5(f"{bundle_name}:{relative_fileloc}".encode()).hexdigest()
-
-
-class DagPriorityParsingRequest(Base):
-    """Model to store the dag parsing requests that will be prioritized when 
parsing files."""
-
-    __tablename__ = "dag_priority_parsing_request"
-
-    # Adding a unique constraint to fileloc results in the creation of an 
index and we have a limitation
-    # on the size of the string we can use in the index for MySQL DB. We also 
have to keep the fileloc
-    # size consistent with other tables. This is a workaround to enforce the 
unique constraint.
-    id = Column(String(32), primary_key=True, default=generate_md5_hash, 
onupdate=generate_md5_hash)
-
-    bundle_name = Column(StringID(), nullable=False)
-    # The location of the file containing the DAG object
-    # Note: Do not depend on fileloc pointing to a file; in the case of a
-    # packaged DAG, it will point to the subpath of the DAG within the
-    # associated zip.
-    relative_fileloc = Column(String(2000), nullable=False)
-
-    def __init__(self, bundle_name: str, relative_fileloc: str) -> None:
-        super().__init__()
-        self.bundle_name = bundle_name
-        self.relative_fileloc = relative_fileloc
-
-    def __repr__(self) -> str:
-        return f"<DagPriorityParsingRequest: bundle_name={self.bundle_name} 
relative_fileloc={self.relative_fileloc}>"
diff --git a/airflow-core/src/airflow/dag_processing/processor.py 
b/airflow-core/src/airflow/dag_processing/processor.py
index a86d308c8c5..cd6f265179a 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -35,7 +35,7 @@ from airflow.callbacks.callback_requests import (
     TaskCallbackRequest,
 )
 from airflow.configuration import conf
-from airflow.models.dagbag import DagBag
+from airflow.dag_processing.dagbag import DagBag
 from airflow.sdk.execution_time.comms import (
     ConnectionResult,
     DeleteVariable,
diff --git a/airflow-core/src/airflow/models/dagbag.py 
b/airflow-core/src/airflow/models/dagbag.py
index fc29cf99155..928e8e406d4 100644
--- a/airflow-core/src/airflow/models/dagbag.py
+++ b/airflow-core/src/airflow/models/dagbag.py
@@ -17,660 +17,24 @@
 # under the License.
 from __future__ import annotations
 
-import contextlib
 import hashlib
-import importlib
-import importlib.machinery
-import importlib.util
-import os
-import signal
-import sys
-import textwrap
-import traceback
-import warnings
-import zipfile
-from datetime import datetime, timedelta
-from pathlib import Path
-from typing import TYPE_CHECKING, NamedTuple
+from typing import TYPE_CHECKING, Any
 
 from sqlalchemy import Column, String, inspect, select
 from sqlalchemy.orm import joinedload
 from sqlalchemy.orm.attributes import NO_VALUE
-from tabulate import tabulate
-
-from airflow import settings
-from airflow._shared.timezones import timezone
-from airflow.configuration import conf
-from airflow.exceptions import (
-    AirflowClusterPolicyError,
-    AirflowClusterPolicySkipDag,
-    AirflowClusterPolicyViolation,
-    AirflowDagDuplicatedIdException,
-    AirflowException,
-    AirflowTaskTimeout,
-    UnknownExecutorException,
-)
-from airflow.executors.executor_loader import ExecutorLoader
-from airflow.listeners.listener import get_listener_manager
+
 from airflow.models.base import Base, StringID
 from airflow.models.dag_version import DagVersion
-from airflow.serialization.serialized_objects import LazyDeserializedDAG, 
SerializedDAG
-from airflow.utils.docs import get_docs_url
-from airflow.utils.file import (
-    correct_maybe_zipped,
-    get_unique_dag_module_name,
-    list_py_file_paths,
-    might_contain_dag,
-)
-from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.session import NEW_SESSION, provide_session
-from airflow.utils.types import NOTSET
-
-try:
-    from airflow.sdk.exceptions import AirflowDagCycleException
-except ImportError:
-    from airflow.exceptions import AirflowDagCycleException  # type: 
ignore[no-redef]
 
 if TYPE_CHECKING:
     from collections.abc import Generator
 
     from sqlalchemy.orm import Session
 
-    from airflow import DAG
     from airflow.models import DagRun
-    from airflow.models.dagwarning import DagWarning
     from airflow.models.serialized_dag import SerializedDagModel
-    from airflow.utils.types import ArgNotSet
-
-
[email protected]
-def _capture_with_reraise() -> Generator[list[warnings.WarningMessage], None, 
None]:
-    """Capture warnings in context and re-raise it on exit from the context 
manager."""
-    captured_warnings = []
-    try:
-        with warnings.catch_warnings(record=True) as captured_warnings:
-            yield captured_warnings
-    finally:
-        if captured_warnings:
-            for cw in captured_warnings:
-                warnings.warn_explicit(
-                    message=cw.message,
-                    category=cw.category,
-                    filename=cw.filename,
-                    lineno=cw.lineno,
-                    source=cw.source,
-                )
-
-
-class FileLoadStat(NamedTuple):
-    """
-    Information about single file.
-
-    :param file: Loaded file.
-    :param duration: Time spent on process file.
-    :param dag_num: Total number of DAGs loaded in this file.
-    :param task_num: Total number of Tasks loaded in this file.
-    :param dags: DAGs names loaded in this file.
-    :param warning_num: Total number of warnings captured from processing this 
file.
-    """
-
-    file: str
-    duration: timedelta
-    dag_num: int
-    task_num: int
-    dags: str
-    warning_num: int
-
-
[email protected]
-def timeout(seconds=1, error_message="Timeout"):
-    import logging
-
-    log = logging.getLogger(__name__)
-    error_message = error_message + ", PID: " + str(os.getpid())
-
-    def handle_timeout(signum, frame):
-        """Log information and raises AirflowTaskTimeout."""
-        log.error("Process timed out, PID: %s", str(os.getpid()))
-        raise AirflowTaskTimeout(error_message)
-
-    try:
-        try:
-            signal.signal(signal.SIGALRM, handle_timeout)
-            signal.setitimer(signal.ITIMER_REAL, seconds)
-        except ValueError:
-            log.warning("timeout can't be used in the current context", 
exc_info=True)
-        yield
-    finally:
-        with contextlib.suppress(ValueError):
-            signal.setitimer(signal.ITIMER_REAL, 0)
-
-
-def _validate_executor_fields(dag: DAG) -> None:
-    for task in dag.tasks:
-        if not task.executor:
-            continue
-        try:
-            ExecutorLoader.lookup_executor_name_by_str(task.executor)
-        except UnknownExecutorException:
-            raise UnknownExecutorException(
-                f"Task '{task.task_id}' specifies executor '{task.executor}', 
which is not available. "
-                "Make sure it is listed in your [core] executors 
configuration, or update the task's "
-                "executor to use one of the configured executors."
-            )
-
-
-class DagBag(LoggingMixin):
-    """
-    A dagbag is a collection of dags, parsed out of a folder tree and has high 
level configuration settings.
-
-    Some possible setting are database to use as a backend and what executor
-    to use to fire off tasks. This makes it easier to run distinct environments
-    for say production and development, tests, or for different teams or 
security
-    profiles. What would have been system level settings are now dagbag level 
so
-    that one system can run multiple, independent settings sets.
-
-    :param dag_folder: the folder to scan to find DAGs
-    :param include_examples: whether to include the examples that ship
-        with airflow or not
-    :param safe_mode: when ``False``, scans all python modules for dags.
-        When ``True`` uses heuristics (files containing ``DAG`` and 
``airflow`` strings)
-        to filter python modules to scan for dags.
-    :param load_op_links: Should the extra operator link be loaded via plugins 
when
-        de-serializing the DAG? This flag is set to False in Scheduler so that 
Extra Operator links
-        are not loaded to not run User code in Scheduler.
-    :param collect_dags: when True, collects dags during class initialization.
-    :param known_pools: If not none, then generate warnings if a Task attempts 
to use an unknown pool.
-    """
-
-    def __init__(
-        self,
-        dag_folder: str | Path | None = None,  # todo AIP-66: rename this to 
path
-        include_examples: bool | ArgNotSet = NOTSET,
-        safe_mode: bool | ArgNotSet = NOTSET,
-        load_op_links: bool = True,
-        collect_dags: bool = True,
-        known_pools: set[str] | None = None,
-        bundle_path: Path | None = None,
-    ):
-        super().__init__()
-        self.bundle_path = bundle_path
-        include_examples = (
-            include_examples
-            if isinstance(include_examples, bool)
-            else conf.getboolean("core", "LOAD_EXAMPLES")
-        )
-        safe_mode = (
-            safe_mode if isinstance(safe_mode, bool) else 
conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE")
-        )
-
-        dag_folder = dag_folder or settings.DAGS_FOLDER
-        self.dag_folder = dag_folder
-        self.dags: dict[str, DAG] = {}
-        # the file's last modified timestamp when we last read it
-        self.file_last_changed: dict[str, datetime] = {}
-        # Store import errors with relative file paths as keys (relative to 
bundle_path)
-        self.import_errors: dict[str, str] = {}
-        self.captured_warnings: dict[str, tuple[str, ...]] = {}
-        self.has_logged = False
-        # Only used by SchedulerJob to compare the dag_hash to identify change 
in DAGs
-        self.dags_hash: dict[str, str] = {}
-
-        self.known_pools = known_pools
-
-        self.dagbag_import_error_tracebacks = conf.getboolean("core", 
"dagbag_import_error_tracebacks")
-        self.dagbag_import_error_traceback_depth = conf.getint("core", 
"dagbag_import_error_traceback_depth")
-        if collect_dags:
-            self.collect_dags(
-                dag_folder=dag_folder,
-                include_examples=include_examples,
-                safe_mode=safe_mode,
-            )
-        # Should the extra operator link be loaded via plugins?
-        # This flag is set to False in Scheduler so that Extra Operator links 
are not loaded
-        self.load_op_links = load_op_links
-
-    def size(self) -> int:
-        """:return: the amount of dags contained in this dagbag"""
-        return len(self.dags)
-
-    @property
-    def dag_ids(self) -> list[str]:
-        """
-        Get DAG ids.
-
-        :return: a list of DAG IDs in this bag
-        """
-        return list(self.dags)
-
-    @provide_session
-    def get_dag(self, dag_id, session: Session = None):
-        """
-        Get the DAG out of the dictionary, and refreshes it if expired.
-
-        :param dag_id: DAG ID
-        """
-        # Avoid circular import
-        from airflow.models.dag import DagModel
-
-        dag = self.dags.get(dag_id)
-
-        # If DAG Model is absent, we can't check last_expired property. Is the 
DAG not yet synchronized?
-        if (orm_dag := DagModel.get_current(dag_id, session=session)) is None:
-            return dag
-
-        is_expired = (
-            orm_dag.last_expired and dag and dag.last_loaded and 
dag.last_loaded < orm_dag.last_expired
-        )
-        if is_expired:
-            # Remove associated dags so we can re-add them.
-            self.dags.pop(dag_id, None)
-        if dag is None or is_expired:
-            # Reprocess source file.
-            found_dags = self.process_file(
-                filepath=correct_maybe_zipped(orm_dag.fileloc), 
only_if_updated=False
-            )
-
-            # If the source file no longer exports `dag_id`, delete it from 
self.dags
-            if found_dags and dag_id in [found_dag.dag_id for found_dag in 
found_dags]:
-                return self.dags[dag_id]
-            self.dags.pop(dag_id, None)
-        return self.dags.get(dag_id)
-
-    def process_file(self, filepath, only_if_updated=True, safe_mode=True):
-        """Given a path to a python module or zip file, import the module and 
look for dag objects within."""
-        from airflow.sdk.definitions._internal.contextmanager import DagContext
-
-        # if the source file no longer exists in the DB or in the filesystem,
-        # return an empty list
-        # todo: raise exception?
-
-        if filepath is None or not os.path.isfile(filepath):
-            return []
-
-        try:
-            # This failed before in what may have been a git sync
-            # race condition
-            file_last_changed_on_disk = 
datetime.fromtimestamp(os.path.getmtime(filepath))
-            if (
-                only_if_updated
-                and filepath in self.file_last_changed
-                and file_last_changed_on_disk == 
self.file_last_changed[filepath]
-            ):
-                return []
-        except Exception as e:
-            self.log.exception(e)
-            return []
-
-        # Ensure we don't pick up anything else we didn't mean to
-        DagContext.autoregistered_dags.clear()
-
-        self.captured_warnings.pop(filepath, None)
-        with _capture_with_reraise() as captured_warnings:
-            if filepath.endswith(".py") or not zipfile.is_zipfile(filepath):
-                mods = self._load_modules_from_file(filepath, safe_mode)
-            else:
-                mods = self._load_modules_from_zip(filepath, safe_mode)
-
-        if captured_warnings:
-            formatted_warnings = []
-            for msg in captured_warnings:
-                category = msg.category.__name__
-                if (module := msg.category.__module__) != "builtins":
-                    category = f"{module}.{category}"
-                formatted_warnings.append(f"{msg.filename}:{msg.lineno}: 
{category}: {msg.message}")
-            self.captured_warnings[filepath] = tuple(formatted_warnings)
-
-        found_dags = self._process_modules(filepath, mods, 
file_last_changed_on_disk)
-
-        self.file_last_changed[filepath] = file_last_changed_on_disk
-        return found_dags
-
-    @property
-    def dag_warnings(self) -> set[DagWarning]:
-        """Get the set of DagWarnings for the bagged dags."""
-        from airflow.models.dagwarning import DagWarning, DagWarningType
-
-        # None means this feature is not enabled. Empty set means we don't 
know about any pools at all!
-        if self.known_pools is None:
-            return set()
-
-        def get_pools(dag) -> dict[str, set[str]]:
-            return {dag.dag_id: {task.pool for task in dag.tasks}}
-
-        pool_dict: dict[str, set[str]] = {}
-        for dag in self.dags.values():
-            pool_dict.update(get_pools(dag))
-
-        warnings: set[DagWarning] = set()
-        for dag_id, dag_pools in pool_dict.items():
-            nonexistent_pools = dag_pools - self.known_pools
-            if nonexistent_pools:
-                warnings.add(
-                    DagWarning(
-                        dag_id,
-                        DagWarningType.NONEXISTENT_POOL,
-                        f"Dag '{dag_id}' references non-existent pools: 
{sorted(nonexistent_pools)!r}",
-                    )
-                )
-        return warnings
-
-    def _get_relative_fileloc(self, filepath: str) -> str:
-        """
-        Get the relative file location for a given filepath.
-
-        :param filepath: Absolute path to the file
-        :return: Relative path from bundle_path, or original filepath if no 
bundle_path
-        """
-        if self.bundle_path:
-            return str(Path(filepath).relative_to(self.bundle_path))
-        return filepath
-
-    def _load_modules_from_file(self, filepath, safe_mode):
-        from airflow.sdk.definitions._internal.contextmanager import DagContext
-
-        def handler(signum, frame):
-            """Handle SIGSEGV signal and let the user know that the import 
failed."""
-            msg = f"Received SIGSEGV signal while processing {filepath}."
-            self.log.error(msg)
-            relative_filepath = self._get_relative_fileloc(filepath)
-            self.import_errors[relative_filepath] = msg
-
-        try:
-            signal.signal(signal.SIGSEGV, handler)
-        except ValueError:
-            self.log.warning("SIGSEGV signal handler registration failed. Not 
in the main thread")
-
-        if not might_contain_dag(filepath, safe_mode):
-            # Don't want to spam user with skip messages
-            if not self.has_logged:
-                self.has_logged = True
-                self.log.info("File %s assumed to contain no DAGs. Skipping.", 
filepath)
-            return []
-
-        self.log.debug("Importing %s", filepath)
-        mod_name = get_unique_dag_module_name(filepath)
-
-        if mod_name in sys.modules:
-            del sys.modules[mod_name]
-
-        DagContext.current_autoregister_module_name = mod_name
-
-        def parse(mod_name, filepath):
-            try:
-                loader = importlib.machinery.SourceFileLoader(mod_name, 
filepath)
-                spec = importlib.util.spec_from_loader(mod_name, loader)
-                new_module = importlib.util.module_from_spec(spec)
-                sys.modules[spec.name] = new_module
-                loader.exec_module(new_module)
-                return [new_module]
-            except KeyboardInterrupt:
-                # re-raise ctrl-c
-                raise
-            except BaseException as e:
-                # Normally you shouldn't catch BaseException, but in this case 
we want to, as, pytest.skip
-                # raises an exception which does not inherit from Exception, 
and we want to catch that here.
-                # This would also catch `exit()` in a dag file
-                DagContext.autoregistered_dags.clear()
-                self.log.exception("Failed to import: %s", filepath)
-                relative_filepath = self._get_relative_fileloc(filepath)
-                if self.dagbag_import_error_tracebacks:
-                    self.import_errors[relative_filepath] = 
traceback.format_exc(
-                        limit=-self.dagbag_import_error_traceback_depth
-                    )
-                else:
-                    self.import_errors[relative_filepath] = str(e)
-                return []
-
-        dagbag_import_timeout = settings.get_dagbag_import_timeout(filepath)
-
-        if not isinstance(dagbag_import_timeout, (int, float)):
-            raise TypeError(
-                f"Value ({dagbag_import_timeout}) from 
get_dagbag_import_timeout must be int or float"
-            )
-
-        if dagbag_import_timeout <= 0:  # no parsing timeout
-            return parse(mod_name, filepath)
-
-        timeout_msg = (
-            f"DagBag import timeout for {filepath} after 
{dagbag_import_timeout}s.\n"
-            "Please take a look at these docs to improve your DAG import 
time:\n"
-            f"* {get_docs_url('best-practices.html#top-level-python-code')}\n"
-            f"* {get_docs_url('best-practices.html#reducing-dag-complexity')}"
-        )
-        with timeout(dagbag_import_timeout, error_message=timeout_msg):
-            return parse(mod_name, filepath)
-
-    def _load_modules_from_zip(self, filepath, safe_mode):
-        from airflow.sdk.definitions._internal.contextmanager import DagContext
-
-        mods = []
-        with zipfile.ZipFile(filepath) as current_zip_file:
-            for zip_info in current_zip_file.infolist():
-                zip_path = Path(zip_info.filename)
-                if zip_path.suffix not in [".py", ".pyc"] or 
len(zip_path.parts) > 1:
-                    continue
-
-                if zip_path.stem == "__init__":
-                    self.log.warning("Found %s at root of %s", zip_path.name, 
filepath)
-
-                self.log.debug("Reading %s from %s", zip_info.filename, 
filepath)
-
-                if not might_contain_dag(zip_info.filename, safe_mode, 
current_zip_file):
-                    # todo: create ignore list
-                    # Don't want to spam user with skip messages
-                    if not self.has_logged:
-                        self.has_logged = True
-                        self.log.info(
-                            "File %s:%s assumed to contain no DAGs. 
Skipping.", filepath, zip_info.filename
-                        )
-                    continue
-
-                mod_name = zip_path.stem
-                if mod_name in sys.modules:
-                    del sys.modules[mod_name]
-
-                DagContext.current_autoregister_module_name = mod_name
-                try:
-                    sys.path.insert(0, filepath)
-                    current_module = importlib.import_module(mod_name)
-                    mods.append(current_module)
-                except Exception as e:
-                    DagContext.autoregistered_dags.clear()
-                    fileloc = os.path.join(filepath, zip_info.filename)
-                    self.log.exception("Failed to import: %s", fileloc)
-                    relative_fileloc = self._get_relative_fileloc(fileloc)
-                    if self.dagbag_import_error_tracebacks:
-                        self.import_errors[relative_fileloc] = 
traceback.format_exc(
-                            limit=-self.dagbag_import_error_traceback_depth
-                        )
-                    else:
-                        self.import_errors[relative_fileloc] = str(e)
-                finally:
-                    if sys.path[0] == filepath:
-                        del sys.path[0]
-        return mods
-
-    def _process_modules(self, filepath, mods, file_last_changed_on_disk):
-        from airflow.sdk import DAG
-        from airflow.sdk.definitions._internal.contextmanager import DagContext
-
-        top_level_dags = {(o, m) for m in mods for o in m.__dict__.values() if 
isinstance(o, DAG)}
-
-        top_level_dags.update(DagContext.autoregistered_dags)
-
-        DagContext.current_autoregister_module_name = None
-        DagContext.autoregistered_dags.clear()
-
-        found_dags = []
-
-        for dag, mod in top_level_dags:
-            dag.fileloc = mod.__file__
-            relative_fileloc = self._get_relative_fileloc(dag.fileloc)
-            dag.relative_fileloc = relative_fileloc
-            try:
-                dag.validate()
-                _validate_executor_fields(dag)
-                self.bag_dag(dag=dag)
-            except AirflowClusterPolicySkipDag:
-                pass
-            except Exception as e:
-                self.log.exception("Failed to bag_dag: %s", dag.fileloc)
-                self.import_errors[relative_fileloc] = f"{type(e).__name__}: 
{e}"
-                self.file_last_changed[dag.fileloc] = file_last_changed_on_disk
-            else:
-                found_dags.append(dag)
-        return found_dags
-
-    def bag_dag(self, dag: DAG):
-        """
-        Add the DAG into the bag.
-
-        :raises: AirflowDagCycleException if a cycle is detected.
-        :raises: AirflowDagDuplicatedIdException if this dag already exists in 
the bag.
-        """
-        dag.check_cycle()  # throws exception if a task cycle is found
-
-        dag.resolve_template_files()
-        dag.last_loaded = timezone.utcnow()
-
-        try:
-            # Check policies
-            settings.dag_policy(dag)
-
-            for task in dag.tasks:
-                # The listeners are not supported when ending a task via a 
trigger on asynchronous operators.
-                if getattr(task, "end_from_trigger", False) and 
get_listener_manager().has_listeners:
-                    raise AirflowException(
-                        "Listeners are not supported with 
end_from_trigger=True for deferrable operators. "
-                        "Task %s in DAG %s has end_from_trigger=True with 
listeners from plugins. "
-                        "Set end_from_trigger=False to use listeners.",
-                        task.task_id,
-                        dag.dag_id,
-                    )
-
-                settings.task_policy(task)
-        except (AirflowClusterPolicyViolation, AirflowClusterPolicySkipDag):
-            raise
-        except Exception as e:
-            self.log.exception(e)
-            raise AirflowClusterPolicyError(e)
-
-        try:
-            prev_dag = self.dags.get(dag.dag_id)
-            if prev_dag and prev_dag.fileloc != dag.fileloc:
-                raise AirflowDagDuplicatedIdException(
-                    dag_id=dag.dag_id,
-                    incoming=dag.fileloc,
-                    existing=self.dags[dag.dag_id].fileloc,
-                )
-            self.dags[dag.dag_id] = dag
-            self.log.debug("Loaded DAG %s", dag)
-        except (AirflowDagCycleException, AirflowDagDuplicatedIdException):
-            # There was an error in bagging the dag. Remove it from the list 
of dags
-            self.log.exception("Exception bagging dag: %s", dag.dag_id)
-            raise
-
-    def collect_dags(
-        self,
-        dag_folder: str | Path | None = None,
-        only_if_updated: bool = True,
-        include_examples: bool = conf.getboolean("core", "LOAD_EXAMPLES"),
-        safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE"),
-    ):
-        """
-        Look for python modules in a given path, import them, and add them to 
the dagbag collection.
-
-        Note that if a ``.airflowignore`` file is found while processing
-        the directory, it will behave much like a ``.gitignore``,
-        ignoring files that match any of the patterns specified
-        in the file.
-
-        **Note**: The patterns in ``.airflowignore`` are interpreted as either
-        un-anchored regexes or gitignore-like glob expressions, depending on
-        the ``DAG_IGNORE_FILE_SYNTAX`` configuration parameter.
-        """
-        self.log.info("Filling up the DagBag from %s", dag_folder)
-        dag_folder = dag_folder or self.dag_folder
-        # Used to store stats around DagBag processing
-        stats = []
-
-        # Ensure dag_folder is a str -- it may have been a pathlib.Path
-        dag_folder = correct_maybe_zipped(str(dag_folder))
-
-        files_to_parse = list_py_file_paths(dag_folder, safe_mode=safe_mode)
-
-        if include_examples:
-            from airflow import example_dags
-
-            example_dag_folder = next(iter(example_dags.__path__))
-
-            files_to_parse.extend(list_py_file_paths(example_dag_folder, 
safe_mode=safe_mode))
-
-        for filepath in files_to_parse:
-            try:
-                file_parse_start_dttm = timezone.utcnow()
-                found_dags = self.process_file(filepath, 
only_if_updated=only_if_updated, safe_mode=safe_mode)
-
-                file_parse_end_dttm = timezone.utcnow()
-                stats.append(
-                    FileLoadStat(
-                        file=filepath.replace(settings.DAGS_FOLDER, ""),
-                        duration=file_parse_end_dttm - file_parse_start_dttm,
-                        dag_num=len(found_dags),
-                        task_num=sum(len(dag.tasks) for dag in found_dags),
-                        dags=str([dag.dag_id for dag in found_dags]),
-                        warning_num=len(self.captured_warnings.get(filepath, 
[])),
-                    )
-                )
-            except Exception as e:
-                self.log.exception(e)
-
-        self.dagbag_stats = sorted(stats, key=lambda x: x.duration, 
reverse=True)
-
-    def dagbag_report(self):
-        """Print a report around DagBag loading stats."""
-        stats = self.dagbag_stats
-        dag_folder = self.dag_folder
-        duration = sum((o.duration for o in stats), 
timedelta()).total_seconds()
-        dag_num = sum(o.dag_num for o in stats)
-        task_num = sum(o.task_num for o in stats)
-        table = tabulate(stats, headers="keys")
-
-        report = textwrap.dedent(
-            f"""\n
-        -------------------------------------------------------------------
-        DagBag loading stats for {dag_folder}
-        -------------------------------------------------------------------
-        Number of DAGs: {dag_num}
-        Total task number: {task_num}
-        DagBag parsing time: {duration}\n{table}
-        """
-        )
-        return report
-
-
-@provide_session
-def sync_bag_to_db(
-    dagbag: DagBag,
-    bundle_name: str,
-    bundle_version: str | None,
-    *,
-    session: Session = NEW_SESSION,
-) -> None:
-    """Save attributes about list of DAG to the DB."""
-    from airflow.dag_processing.collection import 
update_dag_parsing_results_in_db
-
-    import_errors = {(bundle_name, rel_path): error for rel_path, error in 
dagbag.import_errors.items()}
-    update_dag_parsing_results_in_db(
-        bundle_name,
-        bundle_version,
-        [LazyDeserializedDAG.from_dag(dag) for dag in dagbag.dags.values()],
-        import_errors,
-        None,  # file parsing duration is not well defined when parsing 
multiple files / multiple DAGs.
-        dagbag.dag_warnings,
-        session=session,
-    )
+    from airflow.serialization.serialized_objects import SerializedDAG
 
 
 class DBDagBag:
@@ -767,3 +131,26 @@ class DagPriorityParsingRequest(Base):
 
     def __repr__(self) -> str:
         return f"<DagPriorityParsingRequest: bundle_name={self.bundle_name} 
relative_fileloc={self.relative_fileloc}>"
+
+
+def __getattr__(name: str) -> Any:
+    """
+    Backwards-compat shim: importing DagBag from airflow.models.dagbag is 
deprecated.
+
+    Emits DeprecationWarning and re-exports DagBag from 
airflow.dag_processing.dagbag
+    to preserve compatibility for external callers.
+    """
+    if name in {"DagBag", "FileLoadStat", "timeout"}:
+        import warnings
+
+        warnings.warn(
+            f"Importing {name} from airflow.models.dagbag is deprecated and 
will be removed in a future "
+            "release. Please import from airflow.dag_processing.dagbag 
instead.",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+        # Import on demand to avoid import-time side effects
+        from airflow.dag_processing import dagbag as _dagbag
+
+        return getattr(_dagbag, name)
+    raise AttributeError(name)
diff --git a/airflow-core/src/airflow/utils/cli.py 
b/airflow-core/src/airflow/utils/cli.py
index b6423c5af3a..c96cbc005b1 100644
--- a/airflow-core/src/airflow/utils/cli.py
+++ b/airflow-core/src/airflow/utils/cli.py
@@ -36,6 +36,7 @@ from typing import TYPE_CHECKING, TypeVar, cast
 from airflow import settings
 from airflow._shared.timezones import timezone
 from airflow.dag_processing.bundles.manager import DagBundlesManager
+from airflow.dag_processing.dagbag import DagBag
 from airflow.exceptions import AirflowException
 from airflow.sdk.definitions._internal.dag_parsing_context import 
_airflow_parsing_context_manager
 from airflow.utils import cli_action_loggers
@@ -230,7 +231,8 @@ def process_subdir(subdir: str | None):
 def get_dag_by_file_location(dag_id: str):
     """Return DAG of a given dag_id by looking up file location."""
     # TODO: AIP-66 - investigate more, can we use serdag?
-    from airflow.models import DagBag, DagModel
+    from airflow.dag_processing.dagbag import DagBag
+    from airflow.models import DagModel
 
     # Benefit is that logging from other dags in dagbag will not appear
     dag_model = DagModel.get_current(dag_id)
@@ -271,7 +273,7 @@ def get_bagged_dag(bundle_names: list | None, dag_id: str, 
dagfile_path: str | N
     find the correct path (assuming it's a file) and failing that, use the 
configured
     dags folder.
     """
-    from airflow.models.dagbag import DagBag, sync_bag_to_db
+    from airflow.dag_processing.dagbag import sync_bag_to_db
 
     manager = DagBundlesManager()
     for bundle_name in bundle_names or ():
@@ -315,7 +317,7 @@ def get_db_dag(bundle_names: list | None, dag_id: str, 
dagfile_path: str | None
 
 def get_dags(bundle_names: list | None, dag_id: str, use_regex: bool = False, 
from_db: bool = False):
     """Return DAG(s) matching a given regex or dag_id."""
-    from airflow.models import DagBag
+    from airflow.dag_processing.dagbag import DagBag
 
     bundle_names = bundle_names or []
 
diff --git a/airflow-core/tests/integration/otel/test_otel.py 
b/airflow-core/tests/integration/otel/test_otel.py
index 4dd9874f736..fab772935f7 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -28,9 +28,10 @@ from sqlalchemy import select
 
 from airflow._shared.timezones import timezone
 from airflow.dag_processing.bundles.manager import DagBundlesManager
+from airflow.dag_processing.dagbag import DagBag
 from airflow.executors import executor_loader
 from airflow.executors.executor_utils import ExecutorName
-from airflow.models import DAG, DagBag, DagRun
+from airflow.models import DAG, DagRun
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import TaskInstance
 from airflow.serialization.serialized_objects import SerializedDAG
diff --git a/airflow-core/tests/unit/always/test_example_dags.py 
b/airflow-core/tests/unit/always/test_example_dags.py
index 1744a3f88a4..feb62c52b6b 100644
--- a/airflow-core/tests/unit/always/test_example_dags.py
+++ b/airflow-core/tests/unit/always/test_example_dags.py
@@ -27,7 +27,8 @@ import pytest
 from packaging.specifiers import SpecifierSet
 from packaging.version import Version
 
-from airflow.models import Connection, DagBag
+from airflow.dag_processing.dagbag import DagBag
+from airflow.models import Connection
 from airflow.sdk import BaseHook
 from airflow.utils import yaml
 
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
index ead2710ad84..069208cd683 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
@@ -25,7 +25,8 @@ import pytest
 from sqlalchemy import and_, func, select
 
 from airflow._shared.timezones import timezone
-from airflow.models import DagBag, DagModel, DagRun
+from airflow.dag_processing.dagbag import DagBag
+from airflow.models import DagModel, DagRun
 from airflow.models.backfill import Backfill, BackfillDagRun, 
ReprocessBehavior, _create_backfill
 from airflow.models.dag import DAG
 from airflow.models.dagbundle import DagBundleModel
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_report.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_report.py
index 3938894001d..b1d37ac3e42 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_report.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_report.py
@@ -95,7 +95,7 @@ class TestDagReportEndpoint:
         def _mock_collect_dags(self, *args, **kwargs):
             self.dagbag_stats = []
 
-        with patch("airflow.models.dagbag.DagBag.collect_dags", 
_mock_collect_dags):
+        with patch("airflow.dag_processing.dagbag.DagBag.collect_dags", 
_mock_collect_dags):
             response = test_client.get("/dagReports", params={"subdir": 
TEST_DAG_FOLDER})
             assert response.status_code == 200
             assert response.json() == {"dag_reports": [], "total_entries": 0}
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index 0f26235088a..0c06e83392c 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -30,12 +30,12 @@ from sqlalchemy import select
 
 from airflow._shared.timezones.timezone import datetime
 from airflow.dag_processing.bundles.manager import DagBundlesManager
+from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db
 from airflow.jobs.job import Job
 from airflow.jobs.triggerer_job_runner import TriggererJobRunner
 from airflow.listeners.listener import get_listener_manager
 from airflow.models import DagRun, Log, TaskInstance
 from airflow.models.dag_version import DagVersion
-from airflow.models.dagbag import DagBag, sync_bag_to_db
 from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
 from airflow.models.taskinstancehistory import TaskInstanceHistory
 from airflow.models.taskmap import TaskMap
diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py 
b/airflow-core/tests/unit/cli/commands/test_dag_command.py
index f6cae39b720..10332a37a58 100644
--- a/airflow-core/tests/unit/cli/commands/test_dag_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py
@@ -34,9 +34,10 @@ from airflow import settings
 from airflow._shared.timezones import timezone
 from airflow.cli import cli_parser
 from airflow.cli.commands import dag_command
+from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun
-from airflow.models.dagbag import DBDagBag, sync_bag_to_db
+from airflow.models import DagModel, DagRun
+from airflow.models.dagbag import DBDagBag
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.providers.standard.triggers.temporal import DateTimeTrigger, 
TimeDeltaTrigger
 from airflow.sdk import BaseOperator, task
@@ -759,7 +760,7 @@ class TestCliDags:
         mock_render_dag.assert_has_calls([mock.call(mock_get_dag.return_value, 
tis=[])])
         assert "SOURCE" in output
 
-    @mock.patch("airflow.models.dagbag.DagBag")
+    @mock.patch("airflow.utils.cli.DagBag")
     def test_dag_test_with_bundle_name(self, mock_dagbag, 
configure_dag_bundles):
         """Test that DAG can be tested using bundle name."""
         mock_dagbag.return_value.get_dag.return_value.test.return_value = 
DagRun(
@@ -786,7 +787,7 @@ class TestCliDags:
             include_examples=False,
         )
 
-    @mock.patch("airflow.models.dagbag.DagBag")
+    @mock.patch("airflow.utils.cli.DagBag")
     def test_dag_test_with_dagfile_path(self, mock_dagbag, 
configure_dag_bundles):
         """Test that DAG can be tested using dagfile path."""
         mock_dagbag.return_value.get_dag.return_value.test.return_value = 
DagRun(
@@ -807,7 +808,7 @@ class TestCliDags:
             include_examples=False,
         )
 
-    @mock.patch("airflow.models.dagbag.DagBag")
+    @mock.patch("airflow.utils.cli.DagBag")
     def test_dag_test_with_both_bundle_and_dagfile_path(self, mock_dagbag, 
configure_dag_bundles):
         """Test that DAG can be tested using both bundle name and dagfile 
path."""
         mock_dagbag.return_value.get_dag.return_value.test.return_value = 
DagRun(
diff --git a/airflow-core/tests/unit/cli/commands/test_task_command.py 
b/airflow-core/tests/unit/cli/commands/test_task_command.py
index 532324d06c6..1c7b227bb6f 100644
--- a/airflow-core/tests/unit/cli/commands/test_task_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_task_command.py
@@ -35,8 +35,9 @@ from airflow._shared.timezones import timezone
 from airflow.cli import cli_parser
 from airflow.cli.commands import task_command
 from airflow.configuration import conf
+from airflow.dag_processing.dagbag import DagBag
 from airflow.exceptions import DagRunNotFound
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import DagModel, DagRun, TaskInstance
 from airflow.models.dag_version import DagVersion
 from airflow.models.dagbag import DBDagBag
 from airflow.models.serialized_dag import SerializedDagModel
diff --git a/airflow-core/tests/unit/cli/conftest.py 
b/airflow-core/tests/unit/cli/conftest.py
index d65ea767013..9c20c9d7e6a 100644
--- a/airflow-core/tests/unit/cli/conftest.py
+++ b/airflow-core/tests/unit/cli/conftest.py
@@ -21,8 +21,8 @@ import sys
 
 import pytest
 
+from airflow.dag_processing.dagbag import DagBag
 from airflow.executors import local_executor
-from airflow.models.dagbag import DagBag
 from airflow.providers.celery.executors import celery_executor
 from airflow.providers.cncf.kubernetes.executors import kubernetes_executor
 
diff --git a/airflow-core/tests/unit/core/test_impersonation_tests.py 
b/airflow-core/tests/unit/core/test_impersonation_tests.py
index 62c8effb5b2..c4ca10d71df 100644
--- a/airflow-core/tests/unit/core/test_impersonation_tests.py
+++ b/airflow-core/tests/unit/core/test_impersonation_tests.py
@@ -29,7 +29,8 @@ import pytest
 
 from airflow._shared.timezones.timezone import datetime
 from airflow.configuration import conf
-from airflow.models import DagBag, TaskInstance
+from airflow.dag_processing.dagbag import DagBag
+from airflow.models import TaskInstance
 from airflow.utils.db import add_default_pool_if_not_exists
 from airflow.utils.state import State
 
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index c9748c5058c..1b023381bf5 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -42,13 +42,14 @@ from uuid6 import uuid7
 from airflow._shared.timezones import timezone
 from airflow.callbacks.callback_requests import DagCallbackRequest
 from airflow.dag_processing.bundles.manager import DagBundlesManager
+from airflow.dag_processing.dagbag import DagBag
 from airflow.dag_processing.manager import (
     DagFileInfo,
     DagFileProcessorManager,
     DagFileStat,
 )
 from airflow.dag_processing.processor import DagFileProcessorProcess
-from airflow.models import DagBag, DagModel, DbCallbackRequest
+from airflow.models import DagModel, DbCallbackRequest
 from airflow.models.asset import TaskOutletAssetReference
 from airflow.models.dag_version import DagVersion
 from airflow.models.dagbundle import DagBundleModel
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py 
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 8acba729f05..5d3d6622d69 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -47,6 +47,7 @@ from airflow.callbacks.callback_requests import (
     EmailNotificationRequest,
     TaskCallbackRequest,
 )
+from airflow.dag_processing.dagbag import DagBag
 from airflow.dag_processing.manager import process_parse_results
 from airflow.dag_processing.processor import (
     DagFileParseRequest,
@@ -58,7 +59,7 @@ from airflow.dag_processing.processor import (
     _parse_file,
     _pre_import_airflow_modules,
 )
-from airflow.models import DagBag, DagRun
+from airflow.models import DagRun
 from airflow.sdk import DAG, BaseOperator
 from airflow.sdk.api.client import Client
 from airflow.sdk.api.datamodels._generated import DagRunState
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index c6b1edfed8e..feceac3de32 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -44,6 +44,7 @@ from airflow.assets.manager import AssetManager
 from airflow.callbacks.callback_requests import DagCallbackRequest, 
DagRunContext, TaskCallbackRequest
 from airflow.callbacks.database_callback_sink import DatabaseCallbackSink
 from airflow.dag_processing.collection import AssetModelOperation, 
DagModelOperation
+from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db
 from airflow.exceptions import AirflowException
 from airflow.executors.base_executor import BaseExecutor
 from airflow.executors.executor_constants import MOCK_EXECUTOR
@@ -55,7 +56,6 @@ from airflow.models.asset import AssetActive, 
AssetAliasModel, AssetDagRunQueue,
 from airflow.models.backfill import Backfill, _create_backfill
 from airflow.models.dag import DagModel, get_last_dagrun, 
infer_automated_data_interval
 from airflow.models.dag_version import DagVersion
-from airflow.models.dagbag import DagBag, sync_bag_to_db
 from airflow.models.dagrun import DagRun
 from airflow.models.dagwarning import DagWarning
 from airflow.models.db_callback_request import DbCallbackRequest
diff --git a/airflow-core/tests/unit/models/test_dag.py 
b/airflow-core/tests/unit/models/test_dag.py
index 53cafab3543..ee0445bc869 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -37,8 +37,8 @@ from airflow import settings
 from airflow._shared.timezones import timezone
 from airflow._shared.timezones.timezone import datetime as datetime_tz
 from airflow.configuration import conf
+from airflow.dag_processing.dagbag import DagBag
 from airflow.exceptions import AirflowException, ParamValidationError
-from airflow.models import DagBag
 from airflow.models.asset import (
     AssetAliasModel,
     AssetDagRunQueue,
diff --git a/airflow-core/tests/unit/models/test_dagbag.py 
b/airflow-core/tests/unit/models/test_dagbag.py
index 943d9f094b5..7ab82622cda 100644
--- a/airflow-core/tests/unit/models/test_dagbag.py
+++ b/airflow-core/tests/unit/models/test_dagbag.py
@@ -35,10 +35,10 @@ import pytest
 from sqlalchemy import select
 
 from airflow import settings
+from airflow.dag_processing.dagbag import DagBag, _capture_with_reraise, 
_validate_executor_fields
 from airflow.exceptions import UnknownExecutorException
 from airflow.executors.executor_loader import ExecutorLoader
 from airflow.models.dag import DagModel
-from airflow.models.dagbag import DagBag, _capture_with_reraise, 
_validate_executor_fields
 from airflow.models.dagwarning import DagWarning, DagWarningType
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.sdk import DAG, BaseOperator
@@ -101,8 +101,8 @@ class TestDagBag:
         """Test that the timeout context manager raises AirflowTaskTimeout 
when time limit is exceeded."""
         import time
 
+        from airflow.dag_processing.dagbag import timeout
         from airflow.exceptions import AirflowTaskTimeout
-        from airflow.models.dagbag import timeout
 
         with pytest.raises(AirflowTaskTimeout):
             with timeout(1, "Test timeout"):
@@ -249,8 +249,8 @@ class TestDagBag:
         assert sys.path == syspath_before  # sys.path doesn't change
         assert not dagbag.import_errors
 
-    @patch("airflow.models.dagbag.timeout")
-    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    @patch("airflow.dag_processing.dagbag.timeout")
+    @patch("airflow.dag_processing.dagbag.settings.get_dagbag_import_timeout")
     def test_process_dag_file_without_timeout(
         self, mocked_get_dagbag_import_timeout, mocked_timeout, tmp_path
     ):
@@ -268,8 +268,8 @@ class TestDagBag:
         dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, "test_sensor.py"))
         mocked_timeout.assert_not_called()
 
-    @patch("airflow.models.dagbag.timeout")
-    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    @patch("airflow.dag_processing.dagbag.timeout")
+    @patch("airflow.dag_processing.dagbag.settings.get_dagbag_import_timeout")
     def test_process_dag_file_with_non_default_timeout(
         self, mocked_get_dagbag_import_timeout, mocked_timeout, tmp_path
     ):
@@ -287,7 +287,7 @@ class TestDagBag:
 
         mocked_timeout.assert_called_once_with(timeout_value, 
error_message=mock.ANY)
 
-    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    @patch("airflow.dag_processing.dagbag.settings.get_dagbag_import_timeout")
     def test_check_value_type_from_get_dagbag_import_timeout(
         self, mocked_get_dagbag_import_timeout, tmp_path
     ):
@@ -861,7 +861,7 @@ with airflow.DAG(
                 """
             )
         )
-        with mock.patch("airflow.models.dagbag.signal.signal") as mock_signal:
+        with mock.patch("airflow.dag_processing.dagbag.signal.signal") as 
mock_signal:
             mock_signal.side_effect = ValueError("Invalid signal setting")
             DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
             assert "SIGSEGV signal handler registration failed. Not in the 
main thread" in caplog.text
diff --git a/airflow-core/tests/unit/models/test_dagcode.py 
b/airflow-core/tests/unit/models/test_dagcode.py
index 14b733e13eb..b98499ca238 100644
--- a/airflow-core/tests/unit/models/test_dagcode.py
+++ b/airflow-core/tests/unit/models/test_dagcode.py
@@ -24,7 +24,7 @@ import pytest
 from sqlalchemy.exc import IntegrityError
 
 import airflow.example_dags as example_dags_module
-from airflow.models import DagBag
+from airflow.dag_processing.dagbag import DagBag
 from airflow.models.dag_version import DagVersion
 from airflow.models.dagcode import DagCode
 from airflow.sdk import task as task_decorator
diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index 73284f67356..3ec2eccef01 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -77,7 +77,7 @@ async def empty_callback_for_deadline():
 
 @pytest.fixture(scope="module")
 def dagbag():
-    from airflow.models.dagbag import DagBag
+    from airflow.dag_processing.dagbag import DagBag
 
     return DagBag(include_examples=True)
 
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py 
b/airflow-core/tests/unit/models/test_serialized_dag.py
index ba207cc39ec..e310836a391 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -26,10 +26,10 @@ import pytest
 from sqlalchemy import func, select, update
 
 import airflow.example_dags as example_dags_module
+from airflow.dag_processing.dagbag import DagBag
 from airflow.models.asset import AssetActive, AssetAliasModel, AssetModel
 from airflow.models.dag import DagModel
 from airflow.models.dag_version import DagVersion
-from airflow.models.dagbag import DagBag
 from airflow.models.serialized_dag import SerializedDagModel as SDM
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.empty import EmptyOperator
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 6df1b225350..7fcc987cd46 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -46,6 +46,7 @@ from kubernetes.client import models as k8s
 
 import airflow
 from airflow._shared.timezones import timezone
+from airflow.dag_processing.dagbag import DagBag
 from airflow.exceptions import (
     AirflowException,
     ParamValidationError,
@@ -53,7 +54,6 @@ from airflow.exceptions import (
 )
 from airflow.models.asset import AssetModel
 from airflow.models.connection import Connection
-from airflow.models.dagbag import DagBag
 from airflow.models.mappedoperator import MappedOperator
 from airflow.models.xcom import XCOM_RETURN_KEY, XComModel
 from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
diff --git a/devel-common/src/tests_common/pytest_plugin.py 
b/devel-common/src/tests_common/pytest_plugin.py
index a50dba08921..b7c074388b4 100644
--- a/devel-common/src/tests_common/pytest_plugin.py
+++ b/devel-common/src/tests_common/pytest_plugin.py
@@ -1175,7 +1175,7 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
 
         def sync_dagbag_to_db(self):
             if AIRFLOW_V_3_1_PLUS:
-                from airflow.models.dagbag import sync_bag_to_db
+                from airflow.dag_processing.dagbag import sync_bag_to_db
 
                 sync_bag_to_db(self.dagbag, self.bundle_name, None)
             elif AIRFLOW_V_3_0_PLUS:
@@ -1622,7 +1622,13 @@ def session():
 def get_test_dag():
     def _get(dag_id: str):
         from airflow import settings
-        from airflow.models.dagbag import DagBag
+
+        from tests_common.test_utils.version_compat import AIRFLOW_V_3_1_PLUS
+
+        if AIRFLOW_V_3_1_PLUS:
+            from airflow.dag_processing.dagbag import DagBag
+        else:
+            from airflow.models.dagbag import DagBag  # type: ignore[no-redef, 
attribute-defined]
         from airflow.models.serialized_dag import SerializedDagModel
 
         from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
diff --git a/devel-common/src/tests_common/test_utils/db.py 
b/devel-common/src/tests_common/test_utils/db.py
index 9e8f729a9f8..c9d964641e3 100644
--- a/devel-common/src/tests_common/test_utils/db.py
+++ b/devel-common/src/tests_common/test_utils/db.py
@@ -90,7 +90,10 @@ def _deactivate_unknown_dags(active_dag_ids, session):
 
 
 def _bootstrap_dagbag():
-    from airflow.models.dagbag import DagBag
+    if AIRFLOW_V_3_1_PLUS:
+        from airflow.dag_processing.dagbag import DagBag
+    else:  # back-compat for Airflow <3.1
+        from airflow.models.dagbag import DagBag  # type: ignore[no-redef, 
attribute-defined]
 
     if AIRFLOW_V_3_0_PLUS:
         from airflow.dag_processing.bundles.manager import DagBundlesManager
@@ -103,7 +106,7 @@ def _bootstrap_dagbag():
         dagbag = DagBag()
         # Save DAGs in the ORM
         if AIRFLOW_V_3_1_PLUS:
-            from airflow.models.dagbag import sync_bag_to_db
+            from airflow.dag_processing.dagbag import sync_bag_to_db
 
             sync_bag_to_db(dagbag, bundle_name="dags-folder", 
bundle_version=None, session=session)
         elif AIRFLOW_V_3_0_PLUS:
@@ -163,7 +166,10 @@ def initial_db_init():
 
 
 def parse_and_sync_to_db(folder: Path | str, include_examples: bool = False):
-    from airflow.models.dagbag import DagBag
+    if AIRFLOW_V_3_1_PLUS:
+        from airflow.dag_processing.dagbag import DagBag
+    else:
+        from airflow.models.dagbag import DagBag  # type: ignore[no-redef, 
attribute-defined]
 
     if AIRFLOW_V_3_0_PLUS:
         from airflow.dag_processing.bundles.manager import DagBundlesManager
@@ -175,7 +181,7 @@ def parse_and_sync_to_db(folder: Path | str, 
include_examples: bool = False):
 
         dagbag = DagBag(dag_folder=folder, include_examples=include_examples)
         if AIRFLOW_V_3_1_PLUS:
-            from airflow.models.dagbag import sync_bag_to_db
+            from airflow.dag_processing.dagbag import sync_bag_to_db
 
             sync_bag_to_db(dagbag, "dags-folder", None, session=session)
         elif AIRFLOW_V_3_0_PLUS:
diff --git 
a/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
 
b/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
index 488d57b66c4..193092322c3 100644
--- 
a/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
+++ 
b/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
@@ -127,7 +127,7 @@ if AIRFLOW_V_3_1_PLUS:
         with create_session() as session:
             yield from DBDagBag().iter_all_latest_version_dags(session=session)
 else:
-    from airflow.models.dagbag import DagBag
+    from airflow.models.dagbag import DagBag  # type: ignore[attr-defined, 
no-redef]
 
     def _iter_dags() -> Iterable[DAG | SerializedDAG]:
         dagbag = DagBag(read_dags_from_db=True)  # type: ignore[call-arg]
diff --git a/providers/fab/src/airflow/providers/fab/www/airflow_flask_app.py 
b/providers/fab/src/airflow/providers/fab/www/airflow_flask_app.py
index 8db3a1eb903..56c9f830cf3 100644
--- a/providers/fab/src/airflow/providers/fab/www/airflow_flask_app.py
+++ b/providers/fab/src/airflow/providers/fab/www/airflow_flask_app.py
@@ -21,7 +21,12 @@ from typing import TYPE_CHECKING, Any
 from flask import Flask
 
 if TYPE_CHECKING:
-    from airflow.models.dagbag import DagBag
+    from airflow.providers.fab.version_compat import AIRFLOW_V_3_1_PLUS
+
+    if AIRFLOW_V_3_1_PLUS:
+        from airflow.models.dagbag import DBDagBag as DagBag
+    else:
+        from airflow.models.dagbag import DagBag  # type: ignore[no-redef]
 
 
 class AirflowApp(Flask):
diff --git 
a/providers/fab/src/airflow/providers/fab/www/static/dist/airflowDefaultTheme.ff5a35f322070b094aa2.css
 
b/providers/fab/src/airflow/providers/fab/www/static/dist/airflowDefaultTheme.ff5a35f322070b094aa2.css
index 0e29eda7dac..dc37b46e63f 100644
--- 
a/providers/fab/src/airflow/providers/fab/www/static/dist/airflowDefaultTheme.ff5a35f322070b094aa2.css
+++ 
b/providers/fab/src/airflow/providers/fab/www/static/dist/airflowDefaultTheme.ff5a35f322070b094aa2.css
@@ -30,4 +30,4 @@
 
 /*! normalize.css v3.0.2 | MIT License | git.io/normalize 
*/html{font-family:sans-serif;-ms-text-size-adjust:100%;-webkit-text-size-adjust:100%}html[data-color-scheme=dark]{filter:invert(100%)
 hue-rotate(180deg) saturate(90%) 
contrast(85%)}#dark_icon,#light_icon{display:none}html[data-color-scheme=dark] 
#dark_icon{display:block}html[data-color-scheme=dark] 
#light_icon,html[data-color-scheme=light] 
#dark_icon{display:none}html[data-color-scheme=light] 
#light_icon{display:block}body{margin [...]
 
-/*! Source: 
https://github.com/h5bp/html5-boilerplate/blob/master/src/css/main.css */@media 
print{*,:after,:before{background:#0000!important;box-shadow:none!important;color:#000!important;text-shadow:none!important}a,a:visited{text-decoration:underline}a[href]:after{content:"
 (" attr(href) ")"}abbr[title]:after{content:" (" attr(title) 
")"}a[href^="#"]:after,a[href^="javascript:"]:after{content:""}blockquote,pre{page-break-inside:avoid}thead{display:table-header-group}img,tr{page-break-
 [...]
\ No newline at end of file
+/*! Source: 
https://github.com/h5bp/html5-boilerplate/blob/master/src/css/main.css */@media 
print{*,:after,:before{background:#0000!important;box-shadow:none!important;color:#000!important;text-shadow:none!important}a,a:visited{text-decoration:underline}a[href]:after{content:"
 (" attr(href) ")"}abbr[title]:after{content:" (" attr(title) 
")"}a[href^="#"]:after,a[href^="javascript:"]:after{content:""}blockquote,pre{page-break-inside:avoid}thead{display:table-header-group}img,tr{page-break-
 [...]
\ No newline at end of file
diff --git 
a/providers/fab/src/airflow/providers/fab/www/static/dist/materialIcons.3e67dd6fbfcc4f3b5105.css
 
b/providers/fab/src/airflow/providers/fab/www/static/dist/materialIcons.3e67dd6fbfcc4f3b5105.css
index c10472d9cd0..e4b7e396d3c 100644
--- 
a/providers/fab/src/airflow/providers/fab/www/static/dist/materialIcons.3e67dd6fbfcc4f3b5105.css
+++ 
b/providers/fab/src/airflow/providers/fab/www/static/dist/materialIcons.3e67dd6fbfcc4f3b5105.css
@@ -15,4 +15,4 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- */@font-face{font-family:Material 
Icons;font-style:normal;font-weight:400;src:url(data:font/woff2;base64,d09GMgABAAAAAUI8AA4AAAADjOAAAUHkAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGhwbEByCzyoGYACwGhEICorJeIiAFAuwWAABNgIkA5gwBCAFgnoHIFuvznIDpyBryeWaoZ2jbV/h8F/QPjQVyrZdLHXexkto0GNMDdumQfSgOyhSwX2lL/v//89PKjJm20uyrrBtgAcBf70XMBlJPMc4ThIlIFVaT46i3HvL0aKMLvNMab5I12Ve1V2SfJsb7np3ScN7mPsRax9dxfBqO6Fz6tPkk0QcgdrGiFygr7T3/dJeUfRMt8IuknUFvw4YHYSVSIvQCSiexI3nU/9ARmj+Cg1SQI1DUV+UvEIe7pLu8yPlrKtRXgomrPXYSnkbMb
 [...]
\ No newline at end of file
+ */@font-face{font-family:Material 
Icons;font-style:normal;font-weight:400;src:url(data:font/woff2;base64,d09GMgABAAAAAUI8AA4AAAADjOAAAUHkAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGhwbEByCzyoGYACwGhEICorJeIiAFAuwWAABNgIkA5gwBCAFgnoHIFuvznIDpyBryeWaoZ2jbV/h8F/QPjQVyrZdLHXexkto0GNMDdumQfSgOyhSwX2lL/v//89PKjJm20uyrrBtgAcBf70XMBlJPMc4ThIlIFVaT46i3HvL0aKMLvNMab5I12Ve1V2SfJsb7np3ScN7mPsRax9dxfBqO6Fz6tPkk0QcgdrGiFygr7T3/dJeUfRMt8IuknUFvw4YHYSVSIvQCSiexI3nU/9ARmj+Cg1SQI1DUV+UvEIe7pLu8yPlrKtRXgomrPXYSnkbMb
 [...]
\ No newline at end of file
diff --git a/providers/fab/www-hash.txt b/providers/fab/www-hash.txt
index 2332f6b6405..19fcf7ec24b 100644
--- a/providers/fab/www-hash.txt
+++ b/providers/fab/www-hash.txt
@@ -1 +1 @@
-7a4ff69952843dfb3a2b46f1853ccfc797ae551aeed4ed5a30d96958ec476277
+86677c91d374f984777c7270fc1f43e65146f72ca63820ce68ec261f38afa8ff
diff --git 
a/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py 
b/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
index f0706667455..fb84004e7ee 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
@@ -28,7 +28,7 @@ import pytest
 
 from airflow.jobs.job import Job
 from airflow.listeners.listener import get_listener_manager
-from airflow.models import DagBag, TaskInstance
+from airflow.models import TaskInstance
 from airflow.providers.google.cloud.openlineage.utils import 
get_from_nullable_chain
 from airflow.providers.openlineage.plugins.listener import OpenLineageListener
 from airflow.utils import timezone
@@ -37,7 +37,12 @@ from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.db import clear_db_runs
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, 
AIRFLOW_V_3_1_PLUS
+
+if AIRFLOW_V_3_1_PLUS:
+    from airflow.dag_processing.dagbag import DagBag
+else:
+    from airflow.models.dagbag import DagBag  # type: ignore[attr-defined, 
no-redef]
 
 TEST_DAG_FOLDER = os.path.join(os.path.dirname(os.path.dirname(__file__)), 
"dags")
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
diff --git 
a/providers/standard/src/airflow/providers/standard/sensors/external_task.py 
b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
index ffff3f85517..8b35d0ad665 100644
--- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py
+++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
@@ -25,7 +25,6 @@ from typing import TYPE_CHECKING, Any, ClassVar
 from airflow.configuration import conf
 from airflow.exceptions import AirflowSkipException
 from airflow.models.dag import DagModel
-from airflow.models.dagbag import DagBag
 from airflow.providers.standard.exceptions import (
     DuplicateStateError,
     ExternalDagDeletedError,
@@ -41,6 +40,7 @@ from airflow.providers.standard.triggers.external_task import 
WorkflowTrigger
 from airflow.providers.standard.utils.sensor_helper import _get_count, 
_get_external_task_group_task_ids
 from airflow.providers.standard.version_compat import (
     AIRFLOW_V_3_0_PLUS,
+    AIRFLOW_V_3_1_PLUS,
     BaseOperator,
     BaseOperatorLink,
     BaseSensorOperator,
@@ -51,6 +51,11 @@ from airflow.utils.state import State, TaskInstanceState
 if not AIRFLOW_V_3_0_PLUS:
     from airflow.utils.session import NEW_SESSION, provide_session
 
+if AIRFLOW_V_3_1_PLUS:
+    from airflow.dag_processing.dagbag import DagBag
+else:
+    from airflow.models.dagbag import DagBag  # type: ignore[attr-defined, 
no-redef]
+
 if TYPE_CHECKING:
     from sqlalchemy.orm import Session
 
diff --git 
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py 
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
index db8c1c01cd0..a1897f5ffbf 100644
--- 
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
+++ 
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
@@ -27,7 +27,7 @@ import pytest
 
 from airflow import settings
 from airflow.exceptions import AirflowException, AirflowSensorTimeout, 
AirflowSkipException, TaskDeferred
-from airflow.models import DagBag, DagRun, TaskInstance
+from airflow.models import DagRun, TaskInstance
 from airflow.models.dag import DAG
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.xcom_arg import XComArg
@@ -67,9 +67,11 @@ else:
     from airflow.models import BaseOperator  # type: 
ignore[assignment,no-redef]
 
 if AIRFLOW_V_3_1_PLUS:
+    from airflow.dag_processing.dagbag import DagBag
     from airflow.sdk import TaskGroup
     from airflow.sdk.timezone import coerce_datetime, datetime
 else:
+    from airflow.models.dagbag import DagBag  # type: ignore[attr-defined, 
no-redef]
     from airflow.utils.task_group import TaskGroup  # type: ignore[no-redef]
     from airflow.utils.timezone import coerce_datetime, datetime  # type: 
ignore[attr-defined,no-redef]
 
diff --git a/providers/standard/tests/unit/standard/sensors/test_time_delta.py 
b/providers/standard/tests/unit/standard/sensors/test_time_delta.py
index 4c2ad8129b6..629783021c1 100644
--- a/providers/standard/tests/unit/standard/sensors/test_time_delta.py
+++ b/providers/standard/tests/unit/standard/sensors/test_time_delta.py
@@ -25,7 +25,6 @@ import pytest
 import time_machine
 
 from airflow.exceptions import AirflowProviderDeprecationWarning, TaskDeferred
-from airflow.models import DagBag
 from airflow.models.dag import DAG
 from airflow.providers.standard.sensors.time_delta import (
     TimeDeltaSensor,
@@ -36,7 +35,12 @@ from airflow.providers.standard.triggers.temporal import 
DateTimeTrigger
 from airflow.utils.types import DagRunType
 
 from tests_common.test_utils import db
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, timezone
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, 
AIRFLOW_V_3_1_PLUS, timezone
+
+if AIRFLOW_V_3_1_PLUS:
+    from airflow.dag_processing.dagbag import DagBag
+else:
+    from airflow.models.dagbag import DagBag  # type: ignore[attr-defined, 
no-redef]
 
 pytestmark = pytest.mark.db_test
 
diff --git a/providers/standard/tests/unit/standard/sensors/test_weekday.py 
b/providers/standard/tests/unit/standard/sensors/test_weekday.py
index 3f7fa41ea26..d789a085061 100644
--- a/providers/standard/tests/unit/standard/sensors/test_weekday.py
+++ b/providers/standard/tests/unit/standard/sensors/test_weekday.py
@@ -22,13 +22,17 @@ from datetime import timedelta
 import pytest
 
 from airflow.exceptions import AirflowSensorTimeout
-from airflow.models import DagBag
 from airflow.models.dag import DAG
 from airflow.providers.standard.sensors.weekday import DayOfWeekSensor
 from airflow.providers.standard.utils.weekday import WeekDay
 
 from tests_common.test_utils import db
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, timezone
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, 
AIRFLOW_V_3_1_PLUS, timezone
+
+if AIRFLOW_V_3_1_PLUS:
+    from airflow.dag_processing.dagbag import DagBag
+else:
+    from airflow.models import DagBag  # type: ignore[attr-defined, no-redef]
 
 pytestmark = pytest.mark.db_test
 
diff --git a/providers/standard/tests/unit/standard/utils/test_sensor_helper.py 
b/providers/standard/tests/unit/standard/utils/test_sensor_helper.py
index 156d2958a54..9265b292fcc 100644
--- a/providers/standard/tests/unit/standard/utils/test_sensor_helper.py
+++ b/providers/standard/tests/unit/standard/utils/test_sensor_helper.py
@@ -25,8 +25,7 @@ from unittest import mock
 import pendulum
 import pytest
 
-from airflow.models import DAG, TaskInstance
-from airflow.models.dagbag import DagBag
+from airflow.models import DAG, DagBag, TaskInstance
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.utils.sensor_helper import (
     _count_stmt,
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 409982d1a6b..85eb7d514e2 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -602,8 +602,7 @@ def _xcom_push_to_db(ti: RuntimeTaskInstance, key: str, 
value: Any) -> None:
 def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
     # TODO: Task-SDK:
     # Using DagBag here is about 98% wrong, but it'll do for now
-
-    from airflow.models.dagbag import DagBag
+    from airflow.dag_processing.dagbag import DagBag
 
     bundle_info = what.bundle_info
     bundle_instance = DagBundlesManager().get_bundle(

Reply via email to