This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 20b387016e7 Refactor SchedularDagBag and use it across the API server 
(#53153)
20b387016e7 is described below

commit 20b387016e7414b2ebf15259918dd2002387c3b9
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Jul 29 12:17:40 2025 +0100

    Refactor SchedularDagBag and use it across the API server (#53153)
    
    * Refactor SchedularDagBag and use it across the API server
    
    This removes the use of DagBag which parses files in some cases and
    replaces it with refactored SchedulerDagBag.
    
    * fixup! Merge branch 'refactor-schedulerdagbag' of 
github.com:astronomer/airflow into refactor-schedulerdagbag
    
    * Apply suggestions from code review
    
    Co-authored-by: Copilot <[email protected]>
    
    * fixup! Apply suggestions from code review
    
    * fixup! fixup! Apply suggestions from code review
    
    * fixup! fixup! fixup! Apply suggestions from code review
    
    * fixup! fixup! fixup! fixup! Apply suggestions from code review
    
    * fixup! fixup! fixup! fixup! fixup! Apply suggestions from code review
    
    * fixup! fixup! fixup! fixup! fixup! fixup! Apply suggestions from code 
review
    
    * Fix mypy
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 airflow-core/src/airflow/api_fastapi/app.py        |  4 +-
 .../src/airflow/api_fastapi/common/dagbag.py       | 11 ++--
 .../api_fastapi/core_api/datamodels/dag_run.py     |  2 +-
 .../core_api/datamodels/task_instances.py          |  2 +-
 .../core_api/openapi/v2-rest-api-generated.yaml    |  6 +-
 .../api_fastapi/core_api/routes/public/assets.py   |  2 +-
 .../api_fastapi/core_api/routes/public/dag_run.py  | 16 +++--
 .../core_api/routes/public/dag_versions.py         |  3 +-
 .../api_fastapi/core_api/routes/public/dags.py     |  7 +-
 .../core_api/routes/public/extra_links.py          | 10 ++-
 .../api_fastapi/core_api/routes/public/log.py      |  2 +-
 .../core_api/routes/public/task_instances.py       | 75 +++++++++++++---------
 .../api_fastapi/core_api/routes/public/tasks.py    |  9 +--
 .../api_fastapi/core_api/routes/public/xcom.py     | 20 ++++--
 .../api_fastapi/core_api/routes/ui/assets.py       |  2 +-
 .../api_fastapi/core_api/routes/ui/calendar.py     |  7 +-
 .../core_api/services/public/task_instances.py     |  2 +-
 .../api_fastapi/execution_api/routes/dag_runs.py   |  4 +-
 .../execution_api/routes/task_instances.py         | 28 ++++----
 .../src/airflow/jobs/scheduler_job_runner.py       | 59 +++--------------
 airflow-core/src/airflow/models/dag.py             | 10 +--
 airflow-core/src/airflow/models/dagbag.py          | 63 ++++++++++++++++++
 airflow-core/src/airflow/models/taskinstance.py    | 14 ++--
 .../airflow/serialization/serialized_objects.py    |  9 +++
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts |  4 +-
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |  4 +-
 .../tests/unit/api_fastapi/common/test_dagbag.py   |  4 +-
 airflow-core/tests/unit/api_fastapi/conftest.py    |  4 +-
 .../core_api/routes/public/test_dag_sources.py     |  6 +-
 .../core_api/routes/public/test_extra_links.py     | 16 ++---
 .../api_fastapi/core_api/routes/public/test_log.py |  6 +-
 .../core_api/routes/public/test_task_instances.py  | 14 +---
 .../core_api/routes/public/test_tasks.py           | 19 +++---
 .../core_api/routes/public/test_xcom.py            |  4 +-
 .../versions/v2025_04_28/test_task_instances.py    |  2 +-
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 10 +--
 .../src/airflowctl/api/datamodels/generated.py     |  4 +-
 37 files changed, 256 insertions(+), 208 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/app.py 
b/airflow-core/src/airflow/api_fastapi/app.py
index 17de4f677cd..f515be6992c 100644
--- a/airflow-core/src/airflow/api_fastapi/app.py
+++ b/airflow-core/src/airflow/api_fastapi/app.py
@@ -84,10 +84,8 @@ def create_app(apps: str = "all") -> FastAPI:
     dag_bag = create_dag_bag()
 
     if "execution" in apps_list or "all" in apps_list:
-        from airflow.jobs.scheduler_job_runner import SchedulerDagBag
-
         task_exec_api_app = create_task_execution_api_app()
-        task_exec_api_app.state.dag_bag = SchedulerDagBag()
+        task_exec_api_app.state.dag_bag = dag_bag
         init_error_handlers(task_exec_api_app)
         app.mount("/execution", task_exec_api_app)
 
diff --git a/airflow-core/src/airflow/api_fastapi/common/dagbag.py 
b/airflow-core/src/airflow/api_fastapi/common/dagbag.py
index 479ef5cd7a5..47dabcd0f6a 100644
--- a/airflow-core/src/airflow/api_fastapi/common/dagbag.py
+++ b/airflow-core/src/airflow/api_fastapi/common/dagbag.py
@@ -20,16 +20,15 @@ from typing import Annotated
 
 from fastapi import Depends, Request
 
-from airflow.models.dagbag import DagBag
-from airflow.settings import DAGS_FOLDER
+from airflow.models.dagbag import SchedulerDagBag
 
 
-def create_dag_bag() -> DagBag:
+def create_dag_bag() -> SchedulerDagBag:
     """Create DagBag to retrieve DAGs from the database."""
-    return DagBag(DAGS_FOLDER, read_dags_from_db=True)
+    return SchedulerDagBag()
 
 
-def dag_bag_from_app(request: Request) -> DagBag:
+def dag_bag_from_app(request: Request) -> SchedulerDagBag:
     """
     FastAPI dependency resolver that returns the shared DagBag instance from 
app.state.
 
@@ -39,4 +38,4 @@ def dag_bag_from_app(request: Request) -> DagBag:
     return request.app.state.dag_bag
 
 
-DagBagDep = Annotated[DagBag, Depends(dag_bag_from_app)]
+DagBagDep = Annotated[SchedulerDagBag, Depends(dag_bag_from_app)]
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 686e21b444a..901fc2a53a8 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -57,7 +57,7 @@ class DAGRunClearBody(StrictBaseModel):
     only_failed: bool = False
     run_on_latest_version: bool = Field(
         default=False,
-        description="(Experimental) Run on the latest bundle version of the 
DAG after clearing the DAG Run.",
+        description="(Experimental) Run on the latest bundle version of the 
Dag after clearing the Dag Run.",
     )
 
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
index bb04e829652..3611df275e7 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
@@ -181,7 +181,7 @@ class ClearTaskInstancesBody(StrictBaseModel):
     include_past: bool = False
     run_on_latest_version: bool = Field(
         default=False,
-        description="(Experimental) Run on the latest bundle version of the 
DAG after "
+        description="(Experimental) Run on the latest bundle version of the 
dag after "
         "clearing the task instances.",
     )
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 856d8a83cf2..333263f1b41 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -8690,7 +8690,7 @@ components:
         run_on_latest_version:
           type: boolean
           title: Run On Latest Version
-          description: (Experimental) Run on the latest bundle version of the 
DAG
+          description: (Experimental) Run on the latest bundle version of the 
dag
             after clearing the task instances.
           default: false
       additionalProperties: false
@@ -9333,8 +9333,8 @@ components:
         run_on_latest_version:
           type: boolean
           title: Run On Latest Version
-          description: (Experimental) Run on the latest bundle version of the 
DAG
-            after clearing the DAG Run.
+          description: (Experimental) Run on the latest bundle version of the 
Dag
+            after clearing the Dag Run.
           default: false
       additionalProperties: false
       type: object
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
index ace75342932..08b25cc2fd3 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -371,7 +371,7 @@ def materialize_asset(
         )
 
     dag: DAG | None
-    if not (dag := dag_bag.get_dag(dag_id)):
+    if not (dag := dag_bag.get_latest_version_of_dag(dag_id, session)):
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with ID 
`{dag_id}` was not found")
 
     return dag.create_dagrun(
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 643ee1e528f..f7dfef23deb 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -79,7 +79,7 @@ from airflow.api_fastapi.core_api.services.public.dag_run 
import DagRunWaiter
 from airflow.api_fastapi.logging.decorators import action_logging
 from airflow.exceptions import ParamValidationError
 from airflow.listeners.listener import get_listener_manager
-from airflow.models import DAG, DagModel, DagRun
+from airflow.models import DagModel, DagRun
 from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
@@ -168,7 +168,7 @@ def patch_dag_run(
             f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` 
was not found",
         )
 
-    dag: DAG = dag_bag.get_dag(dag_id)
+    dag = dag_bag.get_dag_for_run(dag_run, session=session)
 
     if not dag:
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
@@ -274,9 +274,11 @@ def clear_dag_run(
             f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` 
was not found",
         )
 
-    dag: DAG = dag_bag.get_dag(dag_id)
+    dag = dag_bag.get_dag_for_run(dag_run, session=session)
 
     if body.dry_run:
+        if not dag:
+            raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id 
{dag_id} was not found")
         task_instances = dag.clear(
             run_id=dag_run_id,
             task_ids=None,
@@ -290,6 +292,8 @@ def clear_dag_run(
             task_instances=cast("list[TaskInstanceResponse]", task_instances),
             total_entries=len(task_instances),
         )
+    if not dag:
+        raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
     dag.clear(
         run_id=dag_run_id,
         task_ids=None,
@@ -352,7 +356,7 @@ def get_dag_runs(
     query = select(DagRun)
 
     if dag_id != "~":
-        dag: DAG = dag_bag.get_dag(dag_id)
+        dag = dag_bag.get_latest_version_of_dag(dag_id, session)
         if not dag:
             raise HTTPException(status.HTTP_404_NOT_FOUND, f"The DAG with 
dag_id: `{dag_id}` was not found")
 
@@ -417,7 +421,9 @@ def trigger_dag_run(
         )
 
     try:
-        dag: DAG = dag_bag.get_dag(dag_id)
+        dag = dag_bag.get_latest_version_of_dag(dag_id, session)
+        if not dag:
+            raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with dag_id: 
'{dag_id}' not found")
         params = body.validate_context(dag)
 
         dag_run = dag.create_dagrun(
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_versions.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_versions.py
index df0d46142a3..e71db7a4ff4 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_versions.py
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_versions.py
@@ -39,7 +39,6 @@ from airflow.api_fastapi.core_api.datamodels.dag_versions 
import (
 )
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
 from airflow.api_fastapi.core_api.security import requires_access_dag
-from airflow.models.dag import DAG
 from airflow.models.dag_version import DagVersion
 
 dag_versions_router = AirflowRouter(tags=["DagVersion"], 
prefix="/dags/{dag_id}/dagVersions")
@@ -112,7 +111,7 @@ def get_dag_versions(
     query = select(DagVersion).options(joinedload(DagVersion.dag_model))
 
     if dag_id != "~":
-        dag: DAG = dag_bag.get_dag(dag_id)
+        dag = dag_bag.get_latest_version_of_dag(dag_id, session)
         if not dag:
             raise HTTPException(status.HTTP_404_NOT_FOUND, f"The DAG with 
dag_id: `{dag_id}` was not found")
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
index 7a63eece51c..649f744207b 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
@@ -67,7 +67,7 @@ from airflow.api_fastapi.core_api.security import (
 )
 from airflow.api_fastapi.logging.decorators import action_logging
 from airflow.exceptions import AirflowException, DagNotFound
-from airflow.models import DAG, DagModel
+from airflow.models import DagModel
 from airflow.models.dag_favorite import DagFavorite
 from airflow.models.dagrun import DagRun
 
@@ -172,7 +172,7 @@ def get_dag(
     dag_bag: DagBagDep,
 ) -> DAGResponse:
     """Get basic information about a DAG."""
-    dag: DAG = dag_bag.get_dag(dag_id)
+    dag = dag_bag.get_latest_version_of_dag(dag_id, session)
     if not dag:
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
 
@@ -199,8 +199,7 @@ def get_dag(
 )
 def get_dag_details(dag_id: str, session: SessionDep, dag_bag: DagBagDep) -> 
DAGDetailsResponse:
     """Get details of DAG."""
-    # todo: can we use lazy deser dag here?
-    dag: DAG = dag_bag.get_dag(dag_id)
+    dag = dag_bag.get_latest_version_of_dag(dag_id, session)
     if not dag:
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/extra_links.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/extra_links.py
index 1be9f580b06..085f61c2803 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/extra_links.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/extra_links.py
@@ -29,12 +29,12 @@ from airflow.api_fastapi.core_api.datamodels.extra_links 
import ExtraLinkCollect
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
 from airflow.api_fastapi.core_api.security import DagAccessEntity, 
requires_access_dag
 from airflow.exceptions import TaskNotFound
+from airflow.models import DagRun
 
 if TYPE_CHECKING:
     from airflow.models.mappedoperator import MappedOperator
     from airflow.serialization.serialized_objects import SerializedBaseOperator
 
-
 extra_links_router = AirflowRouter(
     tags=["Extra Links"], 
prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/links"
 )
@@ -57,7 +57,13 @@ def get_extra_links(
     """Get extra links for task instance."""
     from airflow.models.taskinstance import TaskInstance
 
-    if (dag := dag_bag.get_dag(dag_id)) is None:
+    dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id, 
DagRun.run_id == dag_run_id))
+
+    if dag_run:
+        dag = dag_bag.get_dag_for_run(dag_run, session=session)
+    else:
+        dag = dag_bag.get_latest_version_of_dag(dag_id, session=session)
+    if not dag:
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with ID = 
{dag_id} not found")
 
     try:
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
index 688a563d2b6..5f1d04a244a 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
@@ -139,7 +139,7 @@ def get_log(
         metadata["end_of_log"] = True
         raise HTTPException(status.HTTP_404_NOT_FOUND, "TaskInstance not 
found")
 
-    dag = dag_bag.get_dag(dag_id)
+    dag = dag_bag.get_dag_for_run(ti.dag_run, session=session)
     if dag:
         with contextlib.suppress(TaskNotFound):
             ti.task = dag.get_task(ti.task_id)
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
index 07885a7a667..c5e612c0036 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -181,7 +181,11 @@ def get_mapped_task_instances(
     # 0 can mean a mapped TI that expanded to an empty list, so it is not an 
automatic 404
     unfiltered_total_count = get_query_count(query, session=session)
     if unfiltered_total_count == 0:
-        dag = dag_bag.get_dag(dag_id)
+        dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id, 
DagRun.run_id == dag_run_id))
+        if dag_run:
+            dag = dag_bag.get_dag_for_run(dag_run, session=session)
+        else:
+            dag = dag_bag.get_latest_version_of_dag(dag_id, session=session)
         if not dag:
             error_message = f"DAG {dag_id} not found"
             raise HTTPException(status.HTTP_404_NOT_FOUND, error_message)
@@ -258,7 +262,8 @@ def get_task_instance_dependencies(
     deps = []
 
     if ti.state in [None, TaskInstanceState.SCHEDULED]:
-        dag = dag_bag.get_dag(ti.dag_id)
+        dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == 
ti.dag_id, DagRun.run_id == ti.run_id))
+        dag = dag_bag.get_dag_for_run(dag_run, session=session)
 
         if dag:
             try:
@@ -437,6 +442,7 @@ def get_task_instances(
     This endpoint allows specifying `~` as the dag_id, dag_run_id to retrieve 
Task Instances for all DAGs
     and DAG runs.
     """
+    dag_run = None
     query = (
         select(TI)
         .join(TI.dag_run)
@@ -444,13 +450,6 @@ def get_task_instances(
         .options(joinedload(TI.dag_version))
         .options(joinedload(TI.dag_run).options(joinedload(DagRun.dag_model)))
     )
-
-    if dag_id != "~":
-        dag = dag_bag.get_dag(dag_id)
-        if not dag:
-            raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id: 
`{dag_id}` was not found")
-        query = query.where(TI.dag_id == dag_id)
-
     if dag_run_id != "~":
         dag_run = session.scalar(select(DagRun).filter_by(run_id=dag_run_id))
         if not dag_run:
@@ -459,6 +458,14 @@ def get_task_instances(
                 f"DagRun with run_id: `{dag_run_id}` was not found",
             )
         query = query.where(TI.run_id == dag_run_id)
+    if dag_id != "~":
+        if dag_run:
+            dag = dag_bag.get_dag_for_run(dag_run, session)
+        else:
+            dag = dag_bag.get_latest_version_of_dag(dag_id, session)
+        if not dag:
+            raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with dag_id: 
`{dag_id}` was not found")
+        query = query.where(TI.dag_id == dag_id)
 
     task_instance_select, total_entries = paginated_select(
         statement=query,
@@ -654,7 +661,7 @@ def post_clear_task_instances(
     session: SessionDep,
 ) -> TaskInstanceCollectionResponse:
     """Clear task instances."""
-    dag = dag_bag.get_dag(dag_id)
+    dag = dag_bag.get_latest_version_of_dag(dag_id, session)
     if not dag:
         error_message = f"DAG {dag_id} not found"
         raise HTTPException(status.HTTP_404_NOT_FOUND, error_message)
@@ -675,11 +682,10 @@ def post_clear_task_instances(
         if dag_run is None:
             error_message = f"Dag Run id {dag_run_id} not found in dag 
{dag_id}"
             raise HTTPException(status.HTTP_404_NOT_FOUND, error_message)
-        # If dag_run_id is provided, we should get the dag from SchedulerDagBag
-        # to ensure we get the right version.
-        from airflow.jobs.scheduler_job_runner import SchedulerDagBag
-
-        dag = SchedulerDagBag().get_dag(dag_run=dag_run, session=session)
+        # Get the specific dag version:
+        dag = dag_bag.get_dag_for_run(dag_run=dag_run, session=session)
+        if not dag:
+            raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG {dag_id} not 
found")
         if past or future:
             raise HTTPException(
                 status.HTTP_400_BAD_REQUEST,
@@ -707,21 +713,30 @@ def post_clear_task_instances(
             # If we had upstream/downstream etc then also include those!
             task_ids.extend(tid for tid in dag.task_dict if tid != task_id)
 
-    task_instances = dag.clear(
-        dry_run=True,
-        run_id=None if past or future else dag_run_id,
-        task_ids=task_ids,
-        dag_bag=dag_bag,
-        session=session,
-        **body.model_dump(
-            include={
-                "start_date",
-                "end_date",
-                "only_failed",
-                "only_running",
-            }
-        ),
-    )
+    # Prepare common parameters
+    common_params = {
+        "dry_run": True,
+        "task_ids": task_ids,
+        "dag_bag": dag_bag,
+        "session": session,
+        "run_on_latest_version": body.run_on_latest_version,
+        "only_failed": body.only_failed,
+        "only_running": body.only_running,
+    }
+
+    if dag_run_id is not None and not (past or future):
+        # Use run_id-based clearing when we have a specific dag_run_id and not 
using past/future
+        task_instances = dag.clear(
+            **common_params,
+            run_id=dag_run_id,
+        )
+    else:
+        # Use date-based clearing when no dag_run_id or when past/future is 
specified
+        task_instances = dag.clear(
+            **common_params,
+            start_date=body.start_date,
+            end_date=body.end_date,
+        )
 
     if not dry_run:
         clear_task_instances(
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/tasks.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/tasks.py
index c8ce20ab137..aecd4c4be44 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/tasks.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/tasks.py
@@ -24,12 +24,12 @@ from fastapi import Depends, HTTPException, status
 
 from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity
 from airflow.api_fastapi.common.dagbag import DagBagDep
+from airflow.api_fastapi.common.db.common import SessionDep
 from airflow.api_fastapi.common.router import AirflowRouter
 from airflow.api_fastapi.core_api.datamodels.tasks import 
TaskCollectionResponse, TaskResponse
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
 from airflow.api_fastapi.core_api.security import requires_access_dag
 from airflow.exceptions import TaskNotFound
-from airflow.models import DAG
 
 tasks_router = AirflowRouter(tags=["Task"], prefix="/dags/{dag_id}/tasks")
 
@@ -47,10 +47,11 @@ tasks_router = AirflowRouter(tags=["Task"], 
prefix="/dags/{dag_id}/tasks")
 def get_tasks(
     dag_id: str,
     dag_bag: DagBagDep,
+    session: SessionDep,
     order_by: str = "task_id",
 ) -> TaskCollectionResponse:
     """Get tasks for DAG."""
-    dag: DAG = dag_bag.get_dag(dag_id)
+    dag = dag_bag.get_latest_version_of_dag(dag_id, session)
     if not dag:
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
     try:
@@ -73,9 +74,9 @@ def get_tasks(
     ),
     dependencies=[Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.TASK))],
 )
-def get_task(dag_id: str, task_id, dag_bag: DagBagDep) -> TaskResponse:
+def get_task(dag_id: str, task_id, session: SessionDep, dag_bag: DagBagDep) -> 
TaskResponse:
     """Get simplified representation of a task."""
-    dag: DAG = dag_bag.get_dag(dag_id)
+    dag = dag_bag.get_latest_version_of_dag(dag_id, session)
     if not dag:
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
     try:
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
index eed0309da2f..b886ce8f950 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
@@ -39,7 +39,7 @@ from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_
 from airflow.api_fastapi.core_api.security import ReadableXComFilterDep, 
requires_access_dag
 from airflow.api_fastapi.logging.decorators import action_logging
 from airflow.exceptions import TaskNotFound
-from airflow.models import DAG, DagRun as DR
+from airflow.models import DagRun as DR
 from airflow.models.xcom import XComModel
 
 xcom_router = AirflowRouter(
@@ -188,8 +188,14 @@ def create_xcom_entry(
     dag_bag: DagBagDep,
 ) -> XComResponseNative:
     """Create an XCom entry."""
+    from airflow.models.dagrun import DagRun
+
+    dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id, 
DagRun.run_id == dag_run_id))
     # Validate DAG ID
-    dag: DAG = dag_bag.get_dag(dag_id)
+    if dag_run:
+        dag = dag_bag.get_dag_for_run(dag_run, session)
+    else:
+        dag = dag_bag.get_latest_version_of_dag(dag_id, session)
     if not dag:
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with ID: 
`{dag_id}` was not found")
 
@@ -198,15 +204,15 @@ def create_xcom_entry(
         dag.get_task(task_id)
     except TaskNotFound:
         raise HTTPException(
-            status.HTTP_404_NOT_FOUND, f"Task with ID: `{task_id}` not found 
in DAG: `{dag_id}`"
+            status.HTTP_404_NOT_FOUND, f"Task with ID: `{task_id}` not found 
in dag: `{dag_id}`"
         )
 
     # Validate DAG Run ID
-    dag_run = dag.get_dagrun(dag_run_id, session)
     if not dag_run:
-        raise HTTPException(
-            status.HTTP_404_NOT_FOUND, f"DAG Run with ID: `{dag_run_id}` not 
found for DAG: `{dag_id}`"
-        )
+        if not dag_run:
+            raise HTTPException(
+                status.HTTP_404_NOT_FOUND, f"Dag Run with ID: `{dag_run_id}` 
not found for dag: `{dag_id}`"
+            )
 
     # Check existing XCom
     already_existing_query = XComModel.get_many(
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
index fc68f2f1d06..e1976d15fb2 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
@@ -39,7 +39,7 @@ def next_run_assets(
     dag_bag: DagBagDep,
     session: SessionDep,
 ) -> dict:
-    dag = dag_bag.get_dag(dag_id)
+    dag = dag_bag.get_latest_version_of_dag(dag_id, session)
 
     if not dag:
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"can't find dag 
{dag_id}")
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/calendar.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/calendar.py
index cdb1902904b..62bf90dfd24 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/calendar.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/calendar.py
@@ -18,7 +18,8 @@ from __future__ import annotations
 
 from typing import Annotated, Literal
 
-from fastapi import Depends
+from fastapi import Depends, HTTPException
+from starlette import status
 
 from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity
 from airflow.api_fastapi.common.dagbag import DagBagDep
@@ -58,7 +59,9 @@ def get_calendar(
     granularity: Literal["hourly", "daily"] = "daily",
 ) -> CalendarTimeRangeCollectionResponse:
     """Get calendar data for a DAG including historical and planned DAG 
runs."""
-    dag = dag_bag.get_dag(dag_id)
+    dag = dag_bag.get_latest_version_of_dag(dag_id, session)
+    if not dag:
+        raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with dag_id: 
'{dag_id}' not found")
     calendar_service = CalendarService()
 
     return calendar_service.get_calendar_data(
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
index 4a25e07340b..65b09284132 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
@@ -56,7 +56,7 @@ def _patch_ti_validate_request(
     map_index: int | None = -1,
     update_mask: list[str] | None = Query(None),
 ) -> tuple[DAG, list[TI], dict]:
-    dag = dag_bag.get_dag(dag_id)
+    dag = dag_bag.get_latest_version_of_dag(dag_id, session)
     if not dag:
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG {dag_id} not 
found")
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
index 332fe8cd500..d3650868fa1 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
@@ -123,13 +123,13 @@ def clear_dag_run(
                 "message": f"DAG with dag_id: '{dag_id}' has import errors and 
cannot be triggered",
             },
         )
-    from airflow.jobs.scheduler_job_runner import SchedulerDagBag
+    from airflow.models.dagbag import SchedulerDagBag
 
     dag_run = session.scalar(
         select(DagRunModel).where(DagRunModel.dag_id == dag_id, 
DagRunModel.run_id == run_id)
     )
     dag_bag = SchedulerDagBag()
-    dag = dag_bag.get_dag(dag_run=dag_run, session=session)
+    dag = dag_bag.get_dag_for_run(dag_run=dag_run, session=session)
     if not dag:
         raise HTTPException(
             status.HTTP_404_NOT_FOUND,
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 09ef33d10b1..de196f5ff64 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -29,7 +29,7 @@ from uuid import UUID
 import attrs
 import structlog
 from cadwyn import VersionedAPIRouter
-from fastapi import Body, Depends, HTTPException, Query, status
+from fastapi import Body, HTTPException, Query, status
 from pydantic import JsonValue
 from sqlalchemy import func, or_, tuple_, update
 from sqlalchemy.exc import NoResultFound, SQLAlchemyError
@@ -38,7 +38,7 @@ from sqlalchemy.sql import select
 from structlog.contextvars import bind_contextvars
 
 from airflow._shared.timezones import timezone
-from airflow.api_fastapi.common.dagbag import dag_bag_from_app
+from airflow.api_fastapi.common.dagbag import DagBagDep
 from airflow.api_fastapi.common.db.common import SessionDep
 from airflow.api_fastapi.common.types import UtcDateTime
 from airflow.api_fastapi.execution_api.datamodels.taskinstance import (
@@ -59,7 +59,7 @@ from 
airflow.api_fastapi.execution_api.datamodels.taskinstance import (
 from airflow.api_fastapi.execution_api.deps import JWTBearerTIPathDep
 from airflow.exceptions import TaskNotFound
 from airflow.models.asset import AssetActive
-from airflow.models.dagbag import DagBag
+from airflow.models.dagbag import SchedulerDagBag
 from airflow.models.dagrun import DagRun as DR
 from airflow.models.taskinstance import TaskInstance as TI, 
_stop_remaining_tasks
 from airflow.models.taskreschedule import TaskReschedule
@@ -76,10 +76,6 @@ if TYPE_CHECKING:
     from airflow.models.expandinput import SchedulerExpandInput
     from airflow.sdk.types import Operator
 
-from airflow.jobs.scheduler_job_runner import SchedulerDagBag
-
-SchedulerDagBagDep = Annotated[SchedulerDagBag, Depends(dag_bag_from_app)]
-
 router = VersionedAPIRouter()
 
 ti_id_router = VersionedAPIRouter(
@@ -107,7 +103,7 @@ def ti_run(
     task_instance_id: UUID,
     ti_run_payload: Annotated[TIEnterRunningPayload, Body()],
     session: SessionDep,
-    dag_bag: SchedulerDagBagDep,
+    dag_bag: DagBagDep,
 ) -> TIRunContext:
     """
     Run a TaskInstance.
@@ -258,7 +254,7 @@ def ti_run(
             or 0
         )
 
-        if dag := dag_bag.get_dag(dag_run=dr, session=session):
+        if dag := dag_bag.get_dag_for_run(dag_run=dr, session=session):
             upstream_map_indexes = dict(
                 _get_upstream_map_indexes(dag.get_task(ti.task_id), 
ti.map_index, ti.run_id, session)
             )
@@ -333,7 +329,7 @@ def ti_update_state(
     task_instance_id: UUID,
     ti_patch_payload: Annotated[TIStateUpdate, Body()],
     session: SessionDep,
-    dag_bag: SchedulerDagBagDep,
+    dag_bag: DagBagDep,
 ):
     """
     Update the state of a TaskInstance.
@@ -420,9 +416,9 @@ def ti_update_state(
         )
 
 
-def _handle_fail_fast_for_dag(ti: TI, dag_id: str, session: SessionDep, 
dag_bag: SchedulerDagBagDep) -> None:
+def _handle_fail_fast_for_dag(ti: TI, dag_id: str, session: SessionDep, 
dag_bag: DagBagDep) -> None:
     dr = ti.dag_run
-    ser_dag = dag_bag.get_dag(dag_run=dr, session=session)
+    ser_dag = dag_bag.get_dag_for_run(dag_run=dr, session=session)
     if ser_dag and getattr(ser_dag, "fail_fast", False):
         task_dict = getattr(ser_dag, "task_dict")
         task_teardown_map = {k: v.is_teardown for k, v in task_dict.items()}
@@ -436,7 +432,7 @@ def _create_ti_state_update_query_and_update_state(
     query: Update,
     updated_state,
     session: SessionDep,
-    dag_bag: SchedulerDagBagDep,
+    dag_bag: DagBagDep,
     dag_id: str,
 ) -> tuple[Update, TaskInstanceState]:
     if isinstance(ti_patch_payload, (TITerminalStatePayload, 
TIRetryStatePayload, TISuccessStatePayload)):
@@ -854,7 +850,7 @@ def _is_eligible_to_retry(state: str, try_number: int, 
max_tries: int) -> bool:
 
 def _get_group_tasks(dag_id: str, task_group_id: str, session: SessionDep, 
logical_dates=None, run_ids=None):
     # Get all tasks in the task group
-    dag = DagBag(read_dags_from_db=True).get_dag(dag_id, session)
+    dag = SchedulerDagBag().get_latest_version_of_dag(dag_id, session)
     if not dag:
         raise HTTPException(
             status.HTTP_404_NOT_FOUND,
@@ -897,7 +893,7 @@ def _get_group_tasks(dag_id: str, task_group_id: str, 
session: SessionDep, logic
 def validate_inlets_and_outlets(
     task_instance_id: UUID,
     session: SessionDep,
-    dag_bag: SchedulerDagBagDep,
+    dag_bag: DagBagDep,
 ) -> InactiveAssetsResponse:
     """Validate whether there're inactive assets in inlets and outlets of a 
given task instance."""
     ti_id_str = str(task_instance_id)
@@ -916,7 +912,7 @@ def validate_inlets_and_outlets(
 
     if not ti.task:
         dr = ti.dag_run
-        dag = dag_bag.get_dag(dag_run=dr, session=session)
+        dag = dag_bag.get_dag_for_run(dag_run=dr, session=session)
         if dag:
             with contextlib.suppress(TaskNotFound):
                 ti.task = dag.get_task(ti.task_id)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index e3411d42417..961d85bc2fe 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -60,6 +60,7 @@ from airflow.models.asset import (
 from airflow.models.backfill import Backfill
 from airflow.models.dag import DAG, DagModel
 from airflow.models.dag_version import DagVersion
+from airflow.models.dagbag import SchedulerDagBag
 from airflow.models.dagrun import DagRun
 from airflow.models.dagwarning import DagWarning, DagWarningType
 from airflow.models.serialized_dag import SerializedDagModel
@@ -104,48 +105,6 @@ TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT = "stuck in queued 
reschedule"
 """:meta private:"""
 
 
-class SchedulerDagBag:
-    """
-    Internal class for retrieving and caching dags in the scheduler.
-
-    :meta private:
-    """
-
-    def __init__(self):
-        self._dags: dict[str, DAG] = {}  # dag_version_id to dag
-
-    def _get_dag(self, version_id: str, session: Session) -> DAG | None:
-        if dag := self._dags.get(version_id):
-            return dag
-        dag_version = session.get(DagVersion, version_id, 
options=[joinedload(DagVersion.serialized_dag)])
-        if not dag_version:
-            return None
-        serdag = dag_version.serialized_dag
-        if not serdag:
-            return None
-        serdag.load_op_links = False
-        dag = serdag.dag
-        if not dag:
-            return None
-        self._dags[version_id] = dag
-        return dag
-
-    @staticmethod
-    def _version_from_dag_run(dag_run, latest, session):
-        if latest or not dag_run.bundle_version:
-            dag_version = DagVersion.get_latest_version(dag_id=dag_run.dag_id, 
session=session)
-            if dag_version:
-                return dag_version
-
-        return dag_run.created_dag_version
-
-    def get_dag(self, dag_run: DagRun, session: Session, latest=False) -> DAG 
| None:
-        version = self._version_from_dag_run(dag_run=dag_run, latest=latest, 
session=session)
-        if not version:
-            return None
-        return self._get_dag(version_id=version.id, session=session)
-
-
 def _get_current_dag(dag_id: str, session: Session) -> DAG | None:
     serdag = SerializedDagModel.get(dag_id=dag_id, session=session)  # grabs 
the latest version
     if not serdag:
@@ -253,7 +212,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         if log:
             self._log = log
 
-        self.scheduler_dag_bag = SchedulerDagBag()
+        self.scheduler_dag_bag = SchedulerDagBag(load_op_links=False)
 
     @provide_session
     def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
@@ -544,7 +503,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 if task_instance.dag_model.has_task_concurrency_limits:
                     # Many dags don't have a task_concurrency, so where we can 
avoid loading the full
                     # serialized DAG the better.
-                    serialized_dag = self.scheduler_dag_bag.get_dag(
+                    serialized_dag = self.scheduler_dag_bag.get_dag_for_run(
                         dag_run=task_instance.dag_run, session=session
                     )
                     # If the dag is missing, fail the task and continue to the 
next task.
@@ -905,7 +864,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
                 # Get task from the Serialized DAG
                 try:
-                    dag = scheduler_dag_bag.get_dag(dag_run=ti.dag_run, 
session=session)
+                    dag = 
scheduler_dag_bag.get_dag_for_run(dag_run=ti.dag_run, session=session)
                     cls.logger().error(
                         "DAG '%s' for task instance %s not found in 
serialized_dag table",
                         ti.dag_id,
@@ -1034,7 +993,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 .group_by(DagRun)
             )
             for dag_run in paused_runs:
-                dag = self.scheduler_dag_bag.get_dag(dag_run=dag_run, 
session=session)
+                dag = self.scheduler_dag_bag.get_dag_for_run(dag_run=dag_run, 
session=session)
                 if dag is not None:
                     dag_run.dag = dag
                     _, callback_to_run = 
dag_run.update_state(execute_callbacks=False, session=session)
@@ -1386,7 +1345,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         # Send the callbacks after we commit to ensure the context is up to 
date when it gets run
         # cache saves time during scheduling of many dag_runs for same dag
         cached_get_dag: Callable[[DagRun], DAG | None] = lru_cache()(
-            partial(self.scheduler_dag_bag.get_dag, session=session)
+            partial(self.scheduler_dag_bag.get_dag_for_run, session=session)
         )
         for dag_run, callback_to_run in callback_tuples:
             dag = cached_get_dag(dag_run)
@@ -1728,7 +1687,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
         # cache saves time during scheduling of many dag_runs for same dag
         cached_get_dag: Callable[[DagRun], DAG | None] = lru_cache()(
-            partial(self.scheduler_dag_bag.get_dag, session=session)
+            partial(self.scheduler_dag_bag.get_dag_for_run, session=session)
         )
 
         span = Trace.get_current_span()
@@ -1818,7 +1777,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             )
             callback: DagCallbackRequest | None = None
 
-            dag = dag_run.dag = 
self.scheduler_dag_bag.get_dag(dag_run=dag_run, session=session)
+            dag = dag_run.dag = 
self.scheduler_dag_bag.get_dag_for_run(dag_run=dag_run, session=session)
             dag_model = DM.get_dagmodel(dag_run.dag_id, session)
 
             if not dag or not dag_model:
@@ -1936,7 +1895,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             self.log.debug("DAG %s not changed structure, skipping 
dagrun.verify_integrity", dag_run.dag_id)
             return True
         # Refresh the DAG
-        dag_run.dag = self.scheduler_dag_bag.get_dag(dag_run=dag_run, 
session=session)
+        dag_run.dag = self.scheduler_dag_bag.get_dag_for_run(dag_run=dag_run, 
session=session)
         if not dag_run.dag:
             return False
         # Select all TIs in State.unfinished and update the dag_version_id
diff --git a/airflow-core/src/airflow/models/dag.py 
b/airflow-core/src/airflow/models/dag.py
index 5c72eb83665..5d68ad226d2 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -1282,11 +1282,11 @@ class DAG(TaskSDKDag, LoggingMixin):
         only_running: bool = False,
         confirm_prompt: bool = False,
         dag_run_state: DagRunState = DagRunState.QUEUED,
-        run_on_latest_version: bool = False,
         session: Session = NEW_SESSION,
         dag_bag: DagBag | None = None,
         exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = 
frozenset(),
         exclude_run_ids: frozenset[str] | None = frozenset(),
+        run_on_latest_version: bool = False,
     ) -> list[TaskInstance]: ...  # pragma: no cover
 
     @overload
@@ -1300,11 +1300,11 @@ class DAG(TaskSDKDag, LoggingMixin):
         confirm_prompt: bool = False,
         dag_run_state: DagRunState = DagRunState.QUEUED,
         dry_run: Literal[False] = False,
-        run_on_latest_version: bool = False,
         session: Session = NEW_SESSION,
         dag_bag: DagBag | None = None,
         exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = 
frozenset(),
         exclude_run_ids: frozenset[str] | None = frozenset(),
+        run_on_latest_version: bool = False,
     ) -> int: ...  # pragma: no cover
 
     @overload
@@ -1319,11 +1319,11 @@ class DAG(TaskSDKDag, LoggingMixin):
         only_running: bool = False,
         confirm_prompt: bool = False,
         dag_run_state: DagRunState = DagRunState.QUEUED,
-        run_on_latest_version: bool = False,
         session: Session = NEW_SESSION,
         dag_bag: DagBag | None = None,
         exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = 
frozenset(),
         exclude_run_ids: frozenset[str] | None = frozenset(),
+        run_on_latest_version: bool = False,
     ) -> list[TaskInstance]: ...  # pragma: no cover
 
     @overload
@@ -1338,11 +1338,11 @@ class DAG(TaskSDKDag, LoggingMixin):
         confirm_prompt: bool = False,
         dag_run_state: DagRunState = DagRunState.QUEUED,
         dry_run: Literal[False] = False,
-        run_on_latest_version: bool = False,
         session: Session = NEW_SESSION,
         dag_bag: DagBag | None = None,
         exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = 
frozenset(),
         exclude_run_ids: frozenset[str] | None = frozenset(),
+        run_on_latest_version: bool = False,
     ) -> int: ...  # pragma: no cover
 
     @provide_session
@@ -1358,11 +1358,11 @@ class DAG(TaskSDKDag, LoggingMixin):
         confirm_prompt: bool = False,
         dag_run_state: DagRunState = DagRunState.QUEUED,
         dry_run: bool = False,
-        run_on_latest_version: bool = False,
         session: Session = NEW_SESSION,
         dag_bag: DagBag | None = None,
         exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = 
frozenset(),
         exclude_run_ids: frozenset[str] | None = frozenset(),
+        run_on_latest_version: bool = False,
     ) -> int | Iterable[TaskInstance]:
         """
         Clear a set of task instances associated with the current dag for a 
specified date range.
diff --git a/airflow-core/src/airflow/models/dagbag.py 
b/airflow-core/src/airflow/models/dagbag.py
index a6ee7403b04..23c6645c53c 100644
--- a/airflow-core/src/airflow/models/dagbag.py
+++ b/airflow-core/src/airflow/models/dagbag.py
@@ -37,6 +37,7 @@ from sqlalchemy import (
     Column,
     String,
 )
+from sqlalchemy.orm import joinedload
 from tabulate import tabulate
 
 from airflow import settings
@@ -52,6 +53,7 @@ from airflow.exceptions import (
 )
 from airflow.listeners.listener import get_listener_manager
 from airflow.models.base import Base, StringID
+from airflow.models.dag_version import DagVersion
 from airflow.stats import Stats
 from airflow.utils.dag_cycle_tester import check_cycle
 from airflow.utils.docs import get_docs_url
@@ -71,6 +73,7 @@ if TYPE_CHECKING:
 
     from sqlalchemy.orm import Session
 
+    from airflow.models import DagRun
     from airflow.models.dag import DAG
     from airflow.models.dagwarning import DagWarning
     from airflow.utils.types import ArgNotSet
@@ -712,6 +715,66 @@ class DagBag(LoggingMixin):
         )
 
 
+class SchedulerDagBag:
+    """
+    Internal class for retrieving and caching dags in the scheduler.
+
+    :meta private:
+    """
+
+    def __init__(self, load_op_links: bool = True):
+        self._dags: dict[str, DAG] = {}  # dag_version_id to dag
+        self.load_op_links = load_op_links
+
+    def _get_dag(self, version_id: str, session: Session) -> DAG | None:
+        if dag := self._dags.get(version_id):
+            return dag
+        dag_version = session.get(DagVersion, version_id, 
options=[joinedload(DagVersion.serialized_dag)])
+        if not dag_version:
+            return None
+        serdag = dag_version.serialized_dag
+        if not serdag:
+            return None
+        serdag.load_op_links = self.load_op_links
+        dag = serdag.dag
+        if not dag:
+            return None
+        self._dags[version_id] = dag
+        return dag
+
+    @staticmethod
+    def _version_from_dag_run(dag_run, session):
+        if not dag_run.bundle_version:
+            dag_version = DagVersion.get_latest_version(dag_id=dag_run.dag_id, 
session=session)
+            if dag_version:
+                return dag_version
+
+        return dag_run.created_dag_version
+
+    def get_dag_for_run(self, dag_run: DagRun, session: Session) -> DAG | None:
+        version = self._version_from_dag_run(dag_run=dag_run, session=session)
+        if not version:
+            return None
+        return self._get_dag(version_id=version.id, session=session)
+
+    def get_latest_version_of_dag(self, dag_id: str, session: Session) -> DAG 
| None:
+        """
+        Get the latest version of a DAG by its ID.
+
+        This method retrieves the latest version of the DAG with the given ID.
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        serdag = SerializedDagModel.get(dag_id, session=session)
+        if not serdag:
+            return None
+        serdag.load_op_links = self.load_op_links
+        dag = serdag.dag
+
+        self._dags[serdag.dag_version.id] = dag
+        return dag
+
+
 def generate_md5_hash(context):
     bundle_name = context.get_current_parameters()["bundle_name"]
     relative_fileloc = context.get_current_parameters()["relative_fileloc"]
diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 9059c6d3986..53ca7c6589e 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -223,9 +223,9 @@ def clear_task_instances(
     :meta private:
     """
     task_instance_ids: list[str] = []
-    from airflow.jobs.scheduler_job_runner import SchedulerDagBag
+    from airflow.models.dagbag import SchedulerDagBag
 
-    scheduler_dagbag = SchedulerDagBag()
+    scheduler_dagbag = SchedulerDagBag(load_op_links=False)
 
     for ti in tis:
         task_instance_ids.append(ti.id)
@@ -236,7 +236,10 @@ def clear_task_instances(
             ti.state = TaskInstanceState.RESTARTING
         else:
             dr = ti.dag_run
-            ti_dag = scheduler_dagbag.get_dag(dag_run=dr, session=session, 
latest=run_on_latest_version)
+            if run_on_latest_version:
+                ti_dag = scheduler_dagbag.get_latest_version_of_dag(ti.dag_id, 
session=session)
+            else:
+                ti_dag = scheduler_dagbag.get_dag_for_run(dag_run=dr, 
session=session)
             if not ti_dag:
                 log.warning("No serialized dag found for dag '%s'", dr.dag_id)
             task_id = ti.task_id
@@ -279,7 +282,10 @@ def clear_task_instances(
             if dr.state in State.finished_dr_states:
                 dr.state = dag_run_state
                 dr.start_date = timezone.utcnow()
-                dr_dag = scheduler_dagbag.get_dag(dag_run=dr, session=session, 
latest=run_on_latest_version)
+                if run_on_latest_version:
+                    dr_dag = 
scheduler_dagbag.get_latest_version_of_dag(dr.dag_id, session=session)
+                else:
+                    dr_dag = scheduler_dagbag.get_dag_for_run(dag_run=dr, 
session=session)
                 if not dr_dag:
                     log.warning("No serialized dag found for dag '%s'", 
dr.dag_id)
                 if dr_dag and not dr_dag.disable_bundle_versioning and 
run_on_latest_version:
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 3932344c4c3..87297df4f33 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -1336,6 +1336,15 @@ class SerializedBaseOperator(DAGNode, BaseSerialization):
     def expand_start_trigger_args(self, *, context: Context) -> 
StartTriggerArgs | None:
         return self.start_trigger_args
 
+    def __getattr__(self, name):
+        # Handle missing attributes with task_type instead of 
SerializedBaseOperator
+        # Don't intercept special methods that Python internals might check
+        if name.startswith("__") and name.endswith("__"):
+            # For special methods, raise the original error
+            raise AttributeError(f"'{self.__class__.__name__}' object has no 
attribute '{name}'")
+        # For regular attributes, use task_type in the error message
+        raise AttributeError(f"'{self.task_type}' object has no attribute 
'{name}'")
+
     @classmethod
     def serialize_mapped_operator(cls, op: MappedOperator) -> dict[str, Any]:
         serialized_op = cls._serialize_node(op)
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index d836de1bfce..4a9683afdc9 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1209,7 +1209,7 @@ export const $ClearTaskInstancesBody = {
         run_on_latest_version: {
             type: 'boolean',
             title: 'Run On Latest Version',
-            description: '(Experimental) Run on the latest bundle version of 
the DAG after clearing the task instances.',
+            description: '(Experimental) Run on the latest bundle version of 
the dag after clearing the task instances.',
             default: false
         }
     },
@@ -2187,7 +2187,7 @@ export const $DAGRunClearBody = {
         run_on_latest_version: {
             type: 'boolean',
             title: 'Run On Latest Version',
-            description: '(Experimental) Run on the latest bundle version of 
the DAG after clearing the DAG Run.',
+            description: '(Experimental) Run on the latest bundle version of 
the Dag after clearing the Dag Run.',
             default: false
         }
     },
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 47d4b65cd89..b777ca39767 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -390,7 +390,7 @@ export type ClearTaskInstancesBody = {
     include_future?: boolean;
     include_past?: boolean;
     /**
-     * (Experimental) Run on the latest bundle version of the DAG after 
clearing the task instances.
+     * (Experimental) Run on the latest bundle version of the dag after 
clearing the task instances.
      */
     run_on_latest_version?: boolean;
 };
@@ -597,7 +597,7 @@ export type DAGRunClearBody = {
     dry_run?: boolean;
     only_failed?: boolean;
     /**
-     * (Experimental) Run on the latest bundle version of the DAG after 
clearing the DAG Run.
+     * (Experimental) Run on the latest bundle version of the Dag after 
clearing the Dag Run.
      */
     run_on_latest_version?: boolean;
 };
diff --git a/airflow-core/tests/unit/api_fastapi/common/test_dagbag.py 
b/airflow-core/tests/unit/api_fastapi/common/test_dagbag.py
index b0342c29180..be16cd276a9 100644
--- a/airflow-core/tests/unit/api_fastapi/common/test_dagbag.py
+++ b/airflow-core/tests/unit/api_fastapi/common/test_dagbag.py
@@ -48,13 +48,13 @@ class TestDagBagSingleton:
         """Patch DagBag once before app is created, and reset counter."""
         self.dagbag_call_counter["count"] = 0
 
-        from airflow.models.dagbag import DagBag as RealDagBag
+        from airflow.models.dagbag import SchedulerDagBag as RealDagBag
 
         def factory(*args, **kwargs):
             self.dagbag_call_counter["count"] += 1
             return RealDagBag(*args, **kwargs)
 
-        with mock.patch("airflow.api_fastapi.common.dagbag.DagBag", 
side_effect=factory):
+        with mock.patch("airflow.api_fastapi.common.dagbag.SchedulerDagBag", 
side_effect=factory):
             purge_cached_app()
             yield
 
diff --git a/airflow-core/tests/unit/api_fastapi/conftest.py 
b/airflow-core/tests/unit/api_fastapi/conftest.py
index 30cef3aa71e..b2497c194a5 100644
--- a/airflow-core/tests/unit/api_fastapi/conftest.py
+++ b/airflow-core/tests/unit/api_fastapi/conftest.py
@@ -170,10 +170,10 @@ def make_dag_with_multiple_versions(dag_maker, 
configure_git_connection_for_dag_
 
 @pytest.fixture(scope="module")
 def dagbag():
-    from airflow.models import DagBag
+    from airflow.models.dagbag import SchedulerDagBag
 
     parse_and_sync_to_db(os.devnull, include_examples=True)
-    return DagBag(read_dags_from_db=True)
+    return SchedulerDagBag()
 
 
 @pytest.fixture
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py
index dd90faad258..6a62ba1021a 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py
@@ -24,7 +24,7 @@ import pytest
 from httpx import Response
 from sqlalchemy import select
 
-from airflow.models.dagbag import DagBag
+from airflow.models.dagbag import SchedulerDagBag
 from airflow.models.dagcode import DagCode
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.utils.state import DagRunState
@@ -46,9 +46,9 @@ TEST_DAG_DISPLAY_NAME = "example_simplest_dag"
 
 
 @pytest.fixture
-def test_dag():
+def test_dag(session):
     parse_and_sync_to_db(EXAMPLE_DAG_FILE, include_examples=False)
-    return DagBag(read_dags_from_db=True).get_dag(TEST_DAG_ID)
+    return SchedulerDagBag().get_latest_version_of_dag(TEST_DAG_ID, session)
 
 
 class TestGetDAGSource:
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_extra_links.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_extra_links.py
index 780330c0545..a09146e873f 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_extra_links.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_extra_links.py
@@ -16,15 +16,12 @@
 # under the License.
 from __future__ import annotations
 
-import os
-
 import pytest
 
 from airflow._shared.timezones import timezone
 from airflow.api_fastapi.common.dagbag import dag_bag_from_app
 from airflow.api_fastapi.core_api.datamodels.extra_links import 
ExtraLinkCollectionResponse
-from airflow.dag_processing.bundles.manager import DagBundlesManager
-from airflow.models.dagbag import DagBag
+from airflow.models.dagbag import SchedulerDagBag
 from airflow.models.xcom import XComModel as XCom
 from airflow.plugins_manager import AirflowPlugin
 from airflow.utils.state import DagRunState
@@ -92,14 +89,10 @@ class TestGetExtraLinks:
 
         self.dag = self._create_dag(dag_maker)
 
-        DagBundlesManager().sync_bundles_to_db()
-        dag_bag = DagBag(os.devnull, include_examples=False)
-        dag_bag.dags = {self.dag.dag_id: self.dag}
-
+        dag_bag = SchedulerDagBag()
         test_client.app.dependency_overrides[dag_bag_from_app] = lambda: 
dag_bag
-        dag_bag.sync_to_db("dags-folder", None)
 
-        self.dag.create_dagrun(
+        dag_maker.create_dagrun(
             run_id=self.dag_run_id,
             logical_date=self.default_time,
             run_type=DagRunType.MANUAL,
@@ -148,7 +141,7 @@ class TestGetExtraLinks:
         assert response.status_code == expected_status_code
         assert response.json() == expected_response
 
-    def test_should_respond_200(self, dag_maker, test_client):
+    def test_should_respond_200(self, test_client, session):
         XCom.set(
             key="search_query",
             value="TEST_LINK_VALUE",
@@ -163,7 +156,6 @@ class TestGetExtraLinks:
             dag_id=self.dag_id,
             run_id=self.dag_run_id,
         )
-
         response = test_client.get(
             
f"/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_single_link}/links",
         )
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py
index 3a4e7c8ae71..a326f7f2915 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py
@@ -32,6 +32,7 @@ from airflow._shared.timezones import timezone
 from airflow.api_fastapi.common.dagbag import create_dag_bag, dag_bag_from_app
 from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
 from airflow.models.dag import DAG
+from airflow.models.serialized_dag import SerializedDagModel
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.sdk import task
 from airflow.utils.types import DagRunType
@@ -114,8 +115,6 @@ class TestTaskInstancesLog:
         session.commit()
 
         dagbag = create_dag_bag()
-        dagbag.bag_dag(dag)
-        dagbag.bag_dag(dummy_dag)
         test_client.app.dependency_overrides[dag_bag_from_app] = lambda: dagbag
 
     @pytest.fixture
@@ -276,7 +275,8 @@ class TestTaskInstancesLog:
         # Recreate DAG without tasks
         dagbag = create_dag_bag()
         dag = DAG(self.DAG_ID, schedule=None, 
start_date=timezone.parse(self.default_time))
-        dagbag.bag_dag(dag=dag)
+        dag.sync_to_db()
+        SerializedDagModel.write_dag(dag, bundle_name="testing")
 
         self.app.dependency_overrides[dag_bag_from_app] = lambda: dagbag
 
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index 3a0ba67a50a..3cb85f80a70 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -103,7 +103,7 @@ class TestTaskInstanceEndpoint:
         with_ti_history=False,
     ):
         """Method to create task instances using kwargs and default 
arguments"""
-        dag = self.dagbag.get_dag(dag_id)
+        dag = self.dagbag.get_latest_version_of_dag(dag_id, session)
         tasks = dag.tasks
         counter = len(tasks)
         if task_instances is not None:
@@ -1164,7 +1164,7 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint):
     def test_not_found(self, test_client):
         response = test_client.get("/dags/invalid/dagRuns/~/taskInstances")
         assert response.status_code == 404
-        assert response.json() == {"detail": "DAG with dag_id: `invalid` was 
not found"}
+        assert response.json() == {"detail": "Dag with dag_id: `invalid` was 
not found"}
 
         response = test_client.get("/dags/~/dagRuns/invalid/taskInstances")
         assert response.status_code == 404
@@ -2223,7 +2223,6 @@ class 
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
             task_instances=task_instances,
             update_extras=False,
         )
-        self.dagbag.sync_to_db("dags-folder", None)
         response = test_client.post(
             f"/dags/{request_dag}/clearTaskInstances",
             json=payload,
@@ -2249,7 +2248,6 @@ class 
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
             update_extras=False,
             dag_run_state=State.FAILED,
         )
-        self.dagbag.sync_to_db("dags-folder", None)
         response = test_client.post(f"/dags/{dag_id}/clearTaskInstances", 
json=payload)
         assert response.status_code == 400
         assert (
@@ -2317,7 +2315,6 @@ class 
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
             task_instances=task_instances,
             update_extras=False,
         )
-        self.dagbag.sync_to_db("dags-folder", None)
         response = test_client.post(
             f"/dags/{request_dag}/clearTaskInstances",
             json=payload,
@@ -2330,7 +2327,6 @@ class 
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
         self.create_task_instances(session)
         dag_id = "example_python_operator"
         payload = {"reset_dag_runs": True, "dry_run": False}
-        self.dagbag.sync_to_db("dags-folder", None)
         response = test_client.post(
             f"/dags/{dag_id}/clearTaskInstances",
             json=payload,
@@ -2349,7 +2345,6 @@ class 
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
         assert dagrun.state == "running"
 
         payload = {"dry_run": False, "reset_dag_runs": True, "task_ids": [""]}
-        self.dagbag.sync_to_db("dags-folder", None)
         response = test_client.post(
             f"/dags/{dag_id}/clearTaskInstances",
             json=payload,
@@ -2399,7 +2394,6 @@ class 
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
             update_extras=False,
             dag_run_state=DagRunState.FAILED,
         )
-        self.dagbag.sync_to_db("dags-folder", None)
         response = test_client.post(
             f"/dags/{dag_id}/clearTaskInstances",
             json=payload,
@@ -2484,7 +2478,6 @@ class 
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
             update_extras=False,
             dag_run_state=State.FAILED,
         )
-        self.dagbag.sync_to_db("dags-folder", None)
         response = test_client.post(
             f"/dags/{dag_id}/clearTaskInstances",
             json=payload,
@@ -2570,7 +2563,6 @@ class 
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
             update_extras=False,
             dag_run_state=State.FAILED,
         )
-        self.dagbag.sync_to_db("dags-folder", None)
         response = test_client.post(
             f"/dags/{dag_id}/clearTaskInstances",
             json=payload,
@@ -2654,7 +2646,6 @@ class 
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
             update_extras=False,
             dag_run_state=State.FAILED,
         )
-        self.dagbag.sync_to_db("dags-folder", None)
         response = test_client.post(
             f"/dags/{dag_id}/clearTaskInstances",
             json=payload,
@@ -2804,7 +2795,6 @@ class 
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
             task_instances=task_instances,
             update_extras=False,
         )
-        self.dagbag.sync_to_db("dags-folder", None)
         response = test_client.post(
             "/dags/example_python_operator/clearTaskInstances",
             json=payload,
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_tasks.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_tasks.py
index 80dd29f6e65..8a12530264a 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_tasks.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_tasks.py
@@ -16,14 +16,13 @@
 # under the License.
 from __future__ import annotations
 
-import os
 from datetime import datetime
 
 import pytest
 
 from airflow.api_fastapi.common.dagbag import dag_bag_from_app
 from airflow.models.dag import DAG
-from airflow.models.dagbag import DagBag
+from airflow.models.dagbag import SchedulerDagBag
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.sdk.definitions._internal.expandinput import EXPAND_INPUT_EMPTY
@@ -64,12 +63,14 @@ class TestTaskEndpoint:
 
         task1 >> task2
         task4 >> task5
-        dag_bag = DagBag(os.devnull, include_examples=False)
-        dag_bag.dags = {
-            dag.dag_id: dag,
-            mapped_dag.dag_id: mapped_dag,
-            unscheduled_dag.dag_id: unscheduled_dag,
-        }
+        dag.sync_to_db()
+        SerializedDagModel.write_dag(dag, bundle_name="testing")
+        mapped_dag.sync_to_db()
+        SerializedDagModel.write_dag(mapped_dag, bundle_name="testing")
+        unscheduled_dag.sync_to_db()
+        SerializedDagModel.write_dag(unscheduled_dag, bundle_name="testing")
+        dag_bag = SchedulerDagBag()
+
         test_client.app.dependency_overrides[dag_bag_from_app] = lambda: 
dag_bag
 
     @staticmethod
@@ -239,7 +240,7 @@ class TestGetTask(TestTaskEndpoint):
         dag.sync_to_db()
         SerializedDagModel.write_dag(dag, bundle_name="test_bundle")
 
-        dag_bag = DagBag(os.devnull, include_examples=False, 
read_dags_from_db=True)
+        dag_bag = SchedulerDagBag()
         test_client.app.dependency_overrides[dag_bag_from_app] = lambda: 
dag_bag
 
         expected = {
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
index 1c4194c178b..da985795f52 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
@@ -533,7 +533,7 @@ class TestCreateXComEntry(TestXComEndpoint):
                 run_id,
                 XComCreateBody(key=TEST_XCOM_KEY, value=TEST_XCOM_VALUE),
                 404,
-                f"Task with ID: `invalid-task-id` not found in DAG: 
`{TEST_DAG_ID}`",
+                f"Task with ID: `invalid-task-id` not found in dag: 
`{TEST_DAG_ID}`",
                 id="task-not-found",
             ),
             # Test case: DAG Run not found
@@ -543,7 +543,7 @@ class TestCreateXComEntry(TestXComEndpoint):
                 "invalid-dag-run-id",
                 XComCreateBody(key=TEST_XCOM_KEY, value=TEST_XCOM_VALUE),
                 404,
-                f"DAG Run with ID: `invalid-dag-run-id` not found for DAG: 
`{TEST_DAG_ID}`",
+                f"Dag Run with ID: `invalid-dag-run-id` not found for dag: 
`{TEST_DAG_ID}`",
                 id="dag-run-not-found",
             ),
             # Test case: XCom entry already exists
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py
index 661fd93157e..fd71a2f7d60 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py
@@ -23,7 +23,7 @@ import pytest
 
 from airflow._shared.timezones import timezone
 from airflow.api_fastapi.common.dagbag import dag_bag_from_app
-from airflow.jobs.scheduler_job_runner import SchedulerDagBag
+from airflow.models.dagbag import SchedulerDagBag
 from airflow.utils.state import State
 
 from tests_common.test_utils.db import clear_db_assets, clear_db_runs
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 469845e229a..1f12fdc08c1 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -460,7 +460,7 @@ class TestSchedulerJob:
         scheduler_job = Job(executor=executor)
         self.job_runner = SchedulerJobRunner(scheduler_job)
         self.job_runner.scheduler_dag_bag = mock.MagicMock()
-        self.job_runner.scheduler_dag_bag.get_dag.side_effect = 
Exception("failed")
+        self.job_runner.scheduler_dag_bag.get_dag_for_run.side_effect = 
Exception("failed")
 
         session = settings.Session()
 
@@ -976,7 +976,7 @@ class TestSchedulerJob:
         self.job_runner = SchedulerJobRunner(job=scheduler_job)
 
         self.job_runner.scheduler_dag_bag = mock.MagicMock()
-        self.job_runner.scheduler_dag_bag.get_dag.return_value = None
+        self.job_runner.scheduler_dag_bag.get_dag_for_run.return_value = None
 
         dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
 
@@ -3472,7 +3472,7 @@ class TestSchedulerJob:
         dr = drs[0]
 
         self.job_runner._schedule_dag_run(dag_run=dr, session=session)
-        len(self.job_runner.scheduler_dag_bag.get_dag(dr, session).tasks) == 1
+        len(self.job_runner.scheduler_dag_bag.get_dag_for_run(dr, 
session).tasks) == 1
         dag_version_1 = DagVersion.get_latest_version(dr.dag_id, 
session=session)
         assert dr.dag_versions[-1].id == dag_version_1.id
 
@@ -3490,7 +3490,7 @@ class TestSchedulerJob:
         assert len(drs) == 1
         dr = drs[0]
         assert dr.dag_versions[-1].id == dag_version_2.id
-        assert len(self.job_runner.scheduler_dag_bag.get_dag(dr, 
session).tasks) == 2
+        assert len(self.job_runner.scheduler_dag_bag.get_dag_for_run(dr, 
session).tasks) == 2
 
         if SQLALCHEMY_V_1_4:
             tis_count = (
@@ -4335,7 +4335,7 @@ class TestSchedulerJob:
         # assert len(self.job_runner.scheduler_dag_bag._dags) == 1  # sanity 
check
         # Get serialized dag
         dr = DagRun.find(dag_id=dag.dag_id)[0]
-        s_dag_2 = self.job_runner.scheduler_dag_bag.get_dag(dr, 
session=session)
+        s_dag_2 = self.job_runner.scheduler_dag_bag.get_dag_for_run(dr, 
session=session)
         custom_task = s_dag_2.task_dict["custom_task"]
         # Test that custom_task has no Operator Links (after de-serialization) 
in the Scheduling Loop
         assert not custom_task.operator_extra_links
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 0910c25a074..f3bb4d3d292 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -204,7 +204,7 @@ class ClearTaskInstancesBody(BaseModel):
     run_on_latest_version: Annotated[
         bool | None,
         Field(
-            description="(Experimental) Run on the latest bundle version of 
the DAG after clearing the task instances.",
+            description="(Experimental) Run on the latest bundle version of 
the dag after clearing the task instances.",
             title="Run On Latest Version",
         ),
     ] = False
@@ -318,7 +318,7 @@ class DAGRunClearBody(BaseModel):
     run_on_latest_version: Annotated[
         bool | None,
         Field(
-            description="(Experimental) Run on the latest bundle version of 
the DAG after clearing the DAG Run.",
+            description="(Experimental) Run on the latest bundle version of 
the Dag after clearing the Dag Run.",
             title="Run On Latest Version",
         ),
     ] = False

Reply via email to