This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 20b387016e7 Refactor SchedularDagBag and use it across the API server
(#53153)
20b387016e7 is described below
commit 20b387016e7414b2ebf15259918dd2002387c3b9
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Jul 29 12:17:40 2025 +0100
Refactor SchedularDagBag and use it across the API server (#53153)
* Refactor SchedularDagBag and use it across the API server
This removes the use of DagBag which parses files in some cases and
replaces it with refactored SchedulerDagBag.
* fixup! Merge branch 'refactor-schedulerdagbag' of
github.com:astronomer/airflow into refactor-schedulerdagbag
* Apply suggestions from code review
Co-authored-by: Copilot <[email protected]>
* fixup! Apply suggestions from code review
* fixup! fixup! Apply suggestions from code review
* fixup! fixup! fixup! Apply suggestions from code review
* fixup! fixup! fixup! fixup! Apply suggestions from code review
* fixup! fixup! fixup! fixup! fixup! Apply suggestions from code review
* fixup! fixup! fixup! fixup! fixup! fixup! Apply suggestions from code
review
* Fix mypy
---------
Co-authored-by: Copilot <[email protected]>
---
airflow-core/src/airflow/api_fastapi/app.py | 4 +-
.../src/airflow/api_fastapi/common/dagbag.py | 11 ++--
.../api_fastapi/core_api/datamodels/dag_run.py | 2 +-
.../core_api/datamodels/task_instances.py | 2 +-
.../core_api/openapi/v2-rest-api-generated.yaml | 6 +-
.../api_fastapi/core_api/routes/public/assets.py | 2 +-
.../api_fastapi/core_api/routes/public/dag_run.py | 16 +++--
.../core_api/routes/public/dag_versions.py | 3 +-
.../api_fastapi/core_api/routes/public/dags.py | 7 +-
.../core_api/routes/public/extra_links.py | 10 ++-
.../api_fastapi/core_api/routes/public/log.py | 2 +-
.../core_api/routes/public/task_instances.py | 75 +++++++++++++---------
.../api_fastapi/core_api/routes/public/tasks.py | 9 +--
.../api_fastapi/core_api/routes/public/xcom.py | 20 ++++--
.../api_fastapi/core_api/routes/ui/assets.py | 2 +-
.../api_fastapi/core_api/routes/ui/calendar.py | 7 +-
.../core_api/services/public/task_instances.py | 2 +-
.../api_fastapi/execution_api/routes/dag_runs.py | 4 +-
.../execution_api/routes/task_instances.py | 28 ++++----
.../src/airflow/jobs/scheduler_job_runner.py | 59 +++--------------
airflow-core/src/airflow/models/dag.py | 10 +--
airflow-core/src/airflow/models/dagbag.py | 63 ++++++++++++++++++
airflow-core/src/airflow/models/taskinstance.py | 14 ++--
.../airflow/serialization/serialized_objects.py | 9 +++
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 4 +-
.../airflow/ui/openapi-gen/requests/types.gen.ts | 4 +-
.../tests/unit/api_fastapi/common/test_dagbag.py | 4 +-
airflow-core/tests/unit/api_fastapi/conftest.py | 4 +-
.../core_api/routes/public/test_dag_sources.py | 6 +-
.../core_api/routes/public/test_extra_links.py | 16 ++---
.../api_fastapi/core_api/routes/public/test_log.py | 6 +-
.../core_api/routes/public/test_task_instances.py | 14 +---
.../core_api/routes/public/test_tasks.py | 19 +++---
.../core_api/routes/public/test_xcom.py | 4 +-
.../versions/v2025_04_28/test_task_instances.py | 2 +-
airflow-core/tests/unit/jobs/test_scheduler_job.py | 10 +--
.../src/airflowctl/api/datamodels/generated.py | 4 +-
37 files changed, 256 insertions(+), 208 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/app.py
b/airflow-core/src/airflow/api_fastapi/app.py
index 17de4f677cd..f515be6992c 100644
--- a/airflow-core/src/airflow/api_fastapi/app.py
+++ b/airflow-core/src/airflow/api_fastapi/app.py
@@ -84,10 +84,8 @@ def create_app(apps: str = "all") -> FastAPI:
dag_bag = create_dag_bag()
if "execution" in apps_list or "all" in apps_list:
- from airflow.jobs.scheduler_job_runner import SchedulerDagBag
-
task_exec_api_app = create_task_execution_api_app()
- task_exec_api_app.state.dag_bag = SchedulerDagBag()
+ task_exec_api_app.state.dag_bag = dag_bag
init_error_handlers(task_exec_api_app)
app.mount("/execution", task_exec_api_app)
diff --git a/airflow-core/src/airflow/api_fastapi/common/dagbag.py
b/airflow-core/src/airflow/api_fastapi/common/dagbag.py
index 479ef5cd7a5..47dabcd0f6a 100644
--- a/airflow-core/src/airflow/api_fastapi/common/dagbag.py
+++ b/airflow-core/src/airflow/api_fastapi/common/dagbag.py
@@ -20,16 +20,15 @@ from typing import Annotated
from fastapi import Depends, Request
-from airflow.models.dagbag import DagBag
-from airflow.settings import DAGS_FOLDER
+from airflow.models.dagbag import SchedulerDagBag
-def create_dag_bag() -> DagBag:
+def create_dag_bag() -> SchedulerDagBag:
"""Create DagBag to retrieve DAGs from the database."""
- return DagBag(DAGS_FOLDER, read_dags_from_db=True)
+ return SchedulerDagBag()
-def dag_bag_from_app(request: Request) -> DagBag:
+def dag_bag_from_app(request: Request) -> SchedulerDagBag:
"""
FastAPI dependency resolver that returns the shared DagBag instance from
app.state.
@@ -39,4 +38,4 @@ def dag_bag_from_app(request: Request) -> DagBag:
return request.app.state.dag_bag
-DagBagDep = Annotated[DagBag, Depends(dag_bag_from_app)]
+DagBagDep = Annotated[SchedulerDagBag, Depends(dag_bag_from_app)]
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 686e21b444a..901fc2a53a8 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -57,7 +57,7 @@ class DAGRunClearBody(StrictBaseModel):
only_failed: bool = False
run_on_latest_version: bool = Field(
default=False,
- description="(Experimental) Run on the latest bundle version of the
DAG after clearing the DAG Run.",
+ description="(Experimental) Run on the latest bundle version of the
Dag after clearing the Dag Run.",
)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
index bb04e829652..3611df275e7 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
@@ -181,7 +181,7 @@ class ClearTaskInstancesBody(StrictBaseModel):
include_past: bool = False
run_on_latest_version: bool = Field(
default=False,
- description="(Experimental) Run on the latest bundle version of the
DAG after "
+ description="(Experimental) Run on the latest bundle version of the
dag after "
"clearing the task instances.",
)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 856d8a83cf2..333263f1b41 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -8690,7 +8690,7 @@ components:
run_on_latest_version:
type: boolean
title: Run On Latest Version
- description: (Experimental) Run on the latest bundle version of the
DAG
+ description: (Experimental) Run on the latest bundle version of the
dag
after clearing the task instances.
default: false
additionalProperties: false
@@ -9333,8 +9333,8 @@ components:
run_on_latest_version:
type: boolean
title: Run On Latest Version
- description: (Experimental) Run on the latest bundle version of the
DAG
- after clearing the DAG Run.
+ description: (Experimental) Run on the latest bundle version of the
Dag
+ after clearing the Dag Run.
default: false
additionalProperties: false
type: object
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
index ace75342932..08b25cc2fd3 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -371,7 +371,7 @@ def materialize_asset(
)
dag: DAG | None
- if not (dag := dag_bag.get_dag(dag_id)):
+ if not (dag := dag_bag.get_latest_version_of_dag(dag_id, session)):
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with ID
`{dag_id}` was not found")
return dag.create_dagrun(
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 643ee1e528f..f7dfef23deb 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -79,7 +79,7 @@ from airflow.api_fastapi.core_api.services.public.dag_run
import DagRunWaiter
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import ParamValidationError
from airflow.listeners.listener import get_listener_manager
-from airflow.models import DAG, DagModel, DagRun
+from airflow.models import DagModel, DagRun
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
@@ -168,7 +168,7 @@ def patch_dag_run(
f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}`
was not found",
)
- dag: DAG = dag_bag.get_dag(dag_id)
+ dag = dag_bag.get_dag_for_run(dag_run, session=session)
if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
@@ -274,9 +274,11 @@ def clear_dag_run(
f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}`
was not found",
)
- dag: DAG = dag_bag.get_dag(dag_id)
+ dag = dag_bag.get_dag_for_run(dag_run, session=session)
if body.dry_run:
+ if not dag:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id
{dag_id} was not found")
task_instances = dag.clear(
run_id=dag_run_id,
task_ids=None,
@@ -290,6 +292,8 @@ def clear_dag_run(
task_instances=cast("list[TaskInstanceResponse]", task_instances),
total_entries=len(task_instances),
)
+ if not dag:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
dag.clear(
run_id=dag_run_id,
task_ids=None,
@@ -352,7 +356,7 @@ def get_dag_runs(
query = select(DagRun)
if dag_id != "~":
- dag: DAG = dag_bag.get_dag(dag_id)
+ dag = dag_bag.get_latest_version_of_dag(dag_id, session)
if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"The DAG with
dag_id: `{dag_id}` was not found")
@@ -417,7 +421,9 @@ def trigger_dag_run(
)
try:
- dag: DAG = dag_bag.get_dag(dag_id)
+ dag = dag_bag.get_latest_version_of_dag(dag_id, session)
+ if not dag:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with dag_id:
'{dag_id}' not found")
params = body.validate_context(dag)
dag_run = dag.create_dagrun(
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_versions.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_versions.py
index df0d46142a3..e71db7a4ff4 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_versions.py
+++
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_versions.py
@@ -39,7 +39,6 @@ from airflow.api_fastapi.core_api.datamodels.dag_versions
import (
)
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import requires_access_dag
-from airflow.models.dag import DAG
from airflow.models.dag_version import DagVersion
dag_versions_router = AirflowRouter(tags=["DagVersion"],
prefix="/dags/{dag_id}/dagVersions")
@@ -112,7 +111,7 @@ def get_dag_versions(
query = select(DagVersion).options(joinedload(DagVersion.dag_model))
if dag_id != "~":
- dag: DAG = dag_bag.get_dag(dag_id)
+ dag = dag_bag.get_latest_version_of_dag(dag_id, session)
if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"The DAG with
dag_id: `{dag_id}` was not found")
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
index 7a63eece51c..649f744207b 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
@@ -67,7 +67,7 @@ from airflow.api_fastapi.core_api.security import (
)
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import AirflowException, DagNotFound
-from airflow.models import DAG, DagModel
+from airflow.models import DagModel
from airflow.models.dag_favorite import DagFavorite
from airflow.models.dagrun import DagRun
@@ -172,7 +172,7 @@ def get_dag(
dag_bag: DagBagDep,
) -> DAGResponse:
"""Get basic information about a DAG."""
- dag: DAG = dag_bag.get_dag(dag_id)
+ dag = dag_bag.get_latest_version_of_dag(dag_id, session)
if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
@@ -199,8 +199,7 @@ def get_dag(
)
def get_dag_details(dag_id: str, session: SessionDep, dag_bag: DagBagDep) ->
DAGDetailsResponse:
"""Get details of DAG."""
- # todo: can we use lazy deser dag here?
- dag: DAG = dag_bag.get_dag(dag_id)
+ dag = dag_bag.get_latest_version_of_dag(dag_id, session)
if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/extra_links.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/extra_links.py
index 1be9f580b06..085f61c2803 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/extra_links.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/extra_links.py
@@ -29,12 +29,12 @@ from airflow.api_fastapi.core_api.datamodels.extra_links
import ExtraLinkCollect
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import DagAccessEntity,
requires_access_dag
from airflow.exceptions import TaskNotFound
+from airflow.models import DagRun
if TYPE_CHECKING:
from airflow.models.mappedoperator import MappedOperator
from airflow.serialization.serialized_objects import SerializedBaseOperator
-
extra_links_router = AirflowRouter(
tags=["Extra Links"],
prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/links"
)
@@ -57,7 +57,13 @@ def get_extra_links(
"""Get extra links for task instance."""
from airflow.models.taskinstance import TaskInstance
- if (dag := dag_bag.get_dag(dag_id)) is None:
+ dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id,
DagRun.run_id == dag_run_id))
+
+ if dag_run:
+ dag = dag_bag.get_dag_for_run(dag_run, session=session)
+ else:
+ dag = dag_bag.get_latest_version_of_dag(dag_id, session=session)
+ if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with ID =
{dag_id} not found")
try:
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
index 688a563d2b6..5f1d04a244a 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
@@ -139,7 +139,7 @@ def get_log(
metadata["end_of_log"] = True
raise HTTPException(status.HTTP_404_NOT_FOUND, "TaskInstance not
found")
- dag = dag_bag.get_dag(dag_id)
+ dag = dag_bag.get_dag_for_run(ti.dag_run, session=session)
if dag:
with contextlib.suppress(TaskNotFound):
ti.task = dag.get_task(ti.task_id)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
index 07885a7a667..c5e612c0036 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -181,7 +181,11 @@ def get_mapped_task_instances(
# 0 can mean a mapped TI that expanded to an empty list, so it is not an
automatic 404
unfiltered_total_count = get_query_count(query, session=session)
if unfiltered_total_count == 0:
- dag = dag_bag.get_dag(dag_id)
+ dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id,
DagRun.run_id == dag_run_id))
+ if dag_run:
+ dag = dag_bag.get_dag_for_run(dag_run, session=session)
+ else:
+ dag = dag_bag.get_latest_version_of_dag(dag_id, session=session)
if not dag:
error_message = f"DAG {dag_id} not found"
raise HTTPException(status.HTTP_404_NOT_FOUND, error_message)
@@ -258,7 +262,8 @@ def get_task_instance_dependencies(
deps = []
if ti.state in [None, TaskInstanceState.SCHEDULED]:
- dag = dag_bag.get_dag(ti.dag_id)
+ dag_run = session.scalar(select(DagRun).where(DagRun.dag_id ==
ti.dag_id, DagRun.run_id == ti.run_id))
+ dag = dag_bag.get_dag_for_run(dag_run, session=session)
if dag:
try:
@@ -437,6 +442,7 @@ def get_task_instances(
This endpoint allows specifying `~` as the dag_id, dag_run_id to retrieve
Task Instances for all DAGs
and DAG runs.
"""
+ dag_run = None
query = (
select(TI)
.join(TI.dag_run)
@@ -444,13 +450,6 @@ def get_task_instances(
.options(joinedload(TI.dag_version))
.options(joinedload(TI.dag_run).options(joinedload(DagRun.dag_model)))
)
-
- if dag_id != "~":
- dag = dag_bag.get_dag(dag_id)
- if not dag:
- raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id:
`{dag_id}` was not found")
- query = query.where(TI.dag_id == dag_id)
-
if dag_run_id != "~":
dag_run = session.scalar(select(DagRun).filter_by(run_id=dag_run_id))
if not dag_run:
@@ -459,6 +458,14 @@ def get_task_instances(
f"DagRun with run_id: `{dag_run_id}` was not found",
)
query = query.where(TI.run_id == dag_run_id)
+ if dag_id != "~":
+ if dag_run:
+ dag = dag_bag.get_dag_for_run(dag_run, session)
+ else:
+ dag = dag_bag.get_latest_version_of_dag(dag_id, session)
+ if not dag:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with dag_id:
`{dag_id}` was not found")
+ query = query.where(TI.dag_id == dag_id)
task_instance_select, total_entries = paginated_select(
statement=query,
@@ -654,7 +661,7 @@ def post_clear_task_instances(
session: SessionDep,
) -> TaskInstanceCollectionResponse:
"""Clear task instances."""
- dag = dag_bag.get_dag(dag_id)
+ dag = dag_bag.get_latest_version_of_dag(dag_id, session)
if not dag:
error_message = f"DAG {dag_id} not found"
raise HTTPException(status.HTTP_404_NOT_FOUND, error_message)
@@ -675,11 +682,10 @@ def post_clear_task_instances(
if dag_run is None:
error_message = f"Dag Run id {dag_run_id} not found in dag
{dag_id}"
raise HTTPException(status.HTTP_404_NOT_FOUND, error_message)
- # If dag_run_id is provided, we should get the dag from SchedulerDagBag
- # to ensure we get the right version.
- from airflow.jobs.scheduler_job_runner import SchedulerDagBag
-
- dag = SchedulerDagBag().get_dag(dag_run=dag_run, session=session)
+ # Get the specific dag version:
+ dag = dag_bag.get_dag_for_run(dag_run=dag_run, session=session)
+ if not dag:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG {dag_id} not
found")
if past or future:
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
@@ -707,21 +713,30 @@ def post_clear_task_instances(
# If we had upstream/downstream etc then also include those!
task_ids.extend(tid for tid in dag.task_dict if tid != task_id)
- task_instances = dag.clear(
- dry_run=True,
- run_id=None if past or future else dag_run_id,
- task_ids=task_ids,
- dag_bag=dag_bag,
- session=session,
- **body.model_dump(
- include={
- "start_date",
- "end_date",
- "only_failed",
- "only_running",
- }
- ),
- )
+ # Prepare common parameters
+ common_params = {
+ "dry_run": True,
+ "task_ids": task_ids,
+ "dag_bag": dag_bag,
+ "session": session,
+ "run_on_latest_version": body.run_on_latest_version,
+ "only_failed": body.only_failed,
+ "only_running": body.only_running,
+ }
+
+ if dag_run_id is not None and not (past or future):
+ # Use run_id-based clearing when we have a specific dag_run_id and not
using past/future
+ task_instances = dag.clear(
+ **common_params,
+ run_id=dag_run_id,
+ )
+ else:
+ # Use date-based clearing when no dag_run_id or when past/future is
specified
+ task_instances = dag.clear(
+ **common_params,
+ start_date=body.start_date,
+ end_date=body.end_date,
+ )
if not dry_run:
clear_task_instances(
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/tasks.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/tasks.py
index c8ce20ab137..aecd4c4be44 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/tasks.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/tasks.py
@@ -24,12 +24,12 @@ from fastapi import Depends, HTTPException, status
from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity
from airflow.api_fastapi.common.dagbag import DagBagDep
+from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.tasks import
TaskCollectionResponse, TaskResponse
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import requires_access_dag
from airflow.exceptions import TaskNotFound
-from airflow.models import DAG
tasks_router = AirflowRouter(tags=["Task"], prefix="/dags/{dag_id}/tasks")
@@ -47,10 +47,11 @@ tasks_router = AirflowRouter(tags=["Task"],
prefix="/dags/{dag_id}/tasks")
def get_tasks(
dag_id: str,
dag_bag: DagBagDep,
+ session: SessionDep,
order_by: str = "task_id",
) -> TaskCollectionResponse:
"""Get tasks for DAG."""
- dag: DAG = dag_bag.get_dag(dag_id)
+ dag = dag_bag.get_latest_version_of_dag(dag_id, session)
if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
try:
@@ -73,9 +74,9 @@ def get_tasks(
),
dependencies=[Depends(requires_access_dag(method="GET",
access_entity=DagAccessEntity.TASK))],
)
-def get_task(dag_id: str, task_id, dag_bag: DagBagDep) -> TaskResponse:
+def get_task(dag_id: str, task_id, session: SessionDep, dag_bag: DagBagDep) ->
TaskResponse:
"""Get simplified representation of a task."""
- dag: DAG = dag_bag.get_dag(dag_id)
+ dag = dag_bag.get_latest_version_of_dag(dag_id, session)
if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
try:
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
index eed0309da2f..b886ce8f950 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
@@ -39,7 +39,7 @@ from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_
from airflow.api_fastapi.core_api.security import ReadableXComFilterDep,
requires_access_dag
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import TaskNotFound
-from airflow.models import DAG, DagRun as DR
+from airflow.models import DagRun as DR
from airflow.models.xcom import XComModel
xcom_router = AirflowRouter(
@@ -188,8 +188,14 @@ def create_xcom_entry(
dag_bag: DagBagDep,
) -> XComResponseNative:
"""Create an XCom entry."""
+ from airflow.models.dagrun import DagRun
+
+ dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id,
DagRun.run_id == dag_run_id))
# Validate DAG ID
- dag: DAG = dag_bag.get_dag(dag_id)
+ if dag_run:
+ dag = dag_bag.get_dag_for_run(dag_run, session)
+ else:
+ dag = dag_bag.get_latest_version_of_dag(dag_id, session)
if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with ID:
`{dag_id}` was not found")
@@ -198,15 +204,15 @@ def create_xcom_entry(
dag.get_task(task_id)
except TaskNotFound:
raise HTTPException(
- status.HTTP_404_NOT_FOUND, f"Task with ID: `{task_id}` not found
in DAG: `{dag_id}`"
+ status.HTTP_404_NOT_FOUND, f"Task with ID: `{task_id}` not found
in dag: `{dag_id}`"
)
# Validate DAG Run ID
- dag_run = dag.get_dagrun(dag_run_id, session)
if not dag_run:
- raise HTTPException(
- status.HTTP_404_NOT_FOUND, f"DAG Run with ID: `{dag_run_id}` not
found for DAG: `{dag_id}`"
- )
+ if not dag_run:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND, f"Dag Run with ID: `{dag_run_id}`
not found for dag: `{dag_id}`"
+ )
# Check existing XCom
already_existing_query = XComModel.get_many(
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
index fc68f2f1d06..e1976d15fb2 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
@@ -39,7 +39,7 @@ def next_run_assets(
dag_bag: DagBagDep,
session: SessionDep,
) -> dict:
- dag = dag_bag.get_dag(dag_id)
+ dag = dag_bag.get_latest_version_of_dag(dag_id, session)
if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"can't find dag
{dag_id}")
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/calendar.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/calendar.py
index cdb1902904b..62bf90dfd24 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/calendar.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/calendar.py
@@ -18,7 +18,8 @@ from __future__ import annotations
from typing import Annotated, Literal
-from fastapi import Depends
+from fastapi import Depends, HTTPException
+from starlette import status
from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity
from airflow.api_fastapi.common.dagbag import DagBagDep
@@ -58,7 +59,9 @@ def get_calendar(
granularity: Literal["hourly", "daily"] = "daily",
) -> CalendarTimeRangeCollectionResponse:
"""Get calendar data for a DAG including historical and planned DAG
runs."""
- dag = dag_bag.get_dag(dag_id)
+ dag = dag_bag.get_latest_version_of_dag(dag_id, session)
+ if not dag:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with dag_id:
'{dag_id}' not found")
calendar_service = CalendarService()
return calendar_service.get_calendar_data(
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
index 4a25e07340b..65b09284132 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
@@ -56,7 +56,7 @@ def _patch_ti_validate_request(
map_index: int | None = -1,
update_mask: list[str] | None = Query(None),
) -> tuple[DAG, list[TI], dict]:
- dag = dag_bag.get_dag(dag_id)
+ dag = dag_bag.get_latest_version_of_dag(dag_id, session)
if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG {dag_id} not
found")
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
index 332fe8cd500..d3650868fa1 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
@@ -123,13 +123,13 @@ def clear_dag_run(
"message": f"DAG with dag_id: '{dag_id}' has import errors and
cannot be triggered",
},
)
- from airflow.jobs.scheduler_job_runner import SchedulerDagBag
+ from airflow.models.dagbag import SchedulerDagBag
dag_run = session.scalar(
select(DagRunModel).where(DagRunModel.dag_id == dag_id,
DagRunModel.run_id == run_id)
)
dag_bag = SchedulerDagBag()
- dag = dag_bag.get_dag(dag_run=dag_run, session=session)
+ dag = dag_bag.get_dag_for_run(dag_run=dag_run, session=session)
if not dag:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 09ef33d10b1..de196f5ff64 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -29,7 +29,7 @@ from uuid import UUID
import attrs
import structlog
from cadwyn import VersionedAPIRouter
-from fastapi import Body, Depends, HTTPException, Query, status
+from fastapi import Body, HTTPException, Query, status
from pydantic import JsonValue
from sqlalchemy import func, or_, tuple_, update
from sqlalchemy.exc import NoResultFound, SQLAlchemyError
@@ -38,7 +38,7 @@ from sqlalchemy.sql import select
from structlog.contextvars import bind_contextvars
from airflow._shared.timezones import timezone
-from airflow.api_fastapi.common.dagbag import dag_bag_from_app
+from airflow.api_fastapi.common.dagbag import DagBagDep
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.types import UtcDateTime
from airflow.api_fastapi.execution_api.datamodels.taskinstance import (
@@ -59,7 +59,7 @@ from
airflow.api_fastapi.execution_api.datamodels.taskinstance import (
from airflow.api_fastapi.execution_api.deps import JWTBearerTIPathDep
from airflow.exceptions import TaskNotFound
from airflow.models.asset import AssetActive
-from airflow.models.dagbag import DagBag
+from airflow.models.dagbag import SchedulerDagBag
from airflow.models.dagrun import DagRun as DR
from airflow.models.taskinstance import TaskInstance as TI,
_stop_remaining_tasks
from airflow.models.taskreschedule import TaskReschedule
@@ -76,10 +76,6 @@ if TYPE_CHECKING:
from airflow.models.expandinput import SchedulerExpandInput
from airflow.sdk.types import Operator
-from airflow.jobs.scheduler_job_runner import SchedulerDagBag
-
-SchedulerDagBagDep = Annotated[SchedulerDagBag, Depends(dag_bag_from_app)]
-
router = VersionedAPIRouter()
ti_id_router = VersionedAPIRouter(
@@ -107,7 +103,7 @@ def ti_run(
task_instance_id: UUID,
ti_run_payload: Annotated[TIEnterRunningPayload, Body()],
session: SessionDep,
- dag_bag: SchedulerDagBagDep,
+ dag_bag: DagBagDep,
) -> TIRunContext:
"""
Run a TaskInstance.
@@ -258,7 +254,7 @@ def ti_run(
or 0
)
- if dag := dag_bag.get_dag(dag_run=dr, session=session):
+ if dag := dag_bag.get_dag_for_run(dag_run=dr, session=session):
upstream_map_indexes = dict(
_get_upstream_map_indexes(dag.get_task(ti.task_id),
ti.map_index, ti.run_id, session)
)
@@ -333,7 +329,7 @@ def ti_update_state(
task_instance_id: UUID,
ti_patch_payload: Annotated[TIStateUpdate, Body()],
session: SessionDep,
- dag_bag: SchedulerDagBagDep,
+ dag_bag: DagBagDep,
):
"""
Update the state of a TaskInstance.
@@ -420,9 +416,9 @@ def ti_update_state(
)
-def _handle_fail_fast_for_dag(ti: TI, dag_id: str, session: SessionDep,
dag_bag: SchedulerDagBagDep) -> None:
+def _handle_fail_fast_for_dag(ti: TI, dag_id: str, session: SessionDep,
dag_bag: DagBagDep) -> None:
dr = ti.dag_run
- ser_dag = dag_bag.get_dag(dag_run=dr, session=session)
+ ser_dag = dag_bag.get_dag_for_run(dag_run=dr, session=session)
if ser_dag and getattr(ser_dag, "fail_fast", False):
task_dict = getattr(ser_dag, "task_dict")
task_teardown_map = {k: v.is_teardown for k, v in task_dict.items()}
@@ -436,7 +432,7 @@ def _create_ti_state_update_query_and_update_state(
query: Update,
updated_state,
session: SessionDep,
- dag_bag: SchedulerDagBagDep,
+ dag_bag: DagBagDep,
dag_id: str,
) -> tuple[Update, TaskInstanceState]:
if isinstance(ti_patch_payload, (TITerminalStatePayload,
TIRetryStatePayload, TISuccessStatePayload)):
@@ -854,7 +850,7 @@ def _is_eligible_to_retry(state: str, try_number: int,
max_tries: int) -> bool:
def _get_group_tasks(dag_id: str, task_group_id: str, session: SessionDep,
logical_dates=None, run_ids=None):
# Get all tasks in the task group
- dag = DagBag(read_dags_from_db=True).get_dag(dag_id, session)
+ dag = SchedulerDagBag().get_latest_version_of_dag(dag_id, session)
if not dag:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
@@ -897,7 +893,7 @@ def _get_group_tasks(dag_id: str, task_group_id: str,
session: SessionDep, logic
def validate_inlets_and_outlets(
task_instance_id: UUID,
session: SessionDep,
- dag_bag: SchedulerDagBagDep,
+ dag_bag: DagBagDep,
) -> InactiveAssetsResponse:
"""Validate whether there're inactive assets in inlets and outlets of a
given task instance."""
ti_id_str = str(task_instance_id)
@@ -916,7 +912,7 @@ def validate_inlets_and_outlets(
if not ti.task:
dr = ti.dag_run
- dag = dag_bag.get_dag(dag_run=dr, session=session)
+ dag = dag_bag.get_dag_for_run(dag_run=dr, session=session)
if dag:
with contextlib.suppress(TaskNotFound):
ti.task = dag.get_task(ti.task_id)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index e3411d42417..961d85bc2fe 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -60,6 +60,7 @@ from airflow.models.asset import (
from airflow.models.backfill import Backfill
from airflow.models.dag import DAG, DagModel
from airflow.models.dag_version import DagVersion
+from airflow.models.dagbag import SchedulerDagBag
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning, DagWarningType
from airflow.models.serialized_dag import SerializedDagModel
@@ -104,48 +105,6 @@ TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT = "stuck in queued
reschedule"
""":meta private:"""
-class SchedulerDagBag:
- """
- Internal class for retrieving and caching dags in the scheduler.
-
- :meta private:
- """
-
- def __init__(self):
- self._dags: dict[str, DAG] = {} # dag_version_id to dag
-
- def _get_dag(self, version_id: str, session: Session) -> DAG | None:
- if dag := self._dags.get(version_id):
- return dag
- dag_version = session.get(DagVersion, version_id,
options=[joinedload(DagVersion.serialized_dag)])
- if not dag_version:
- return None
- serdag = dag_version.serialized_dag
- if not serdag:
- return None
- serdag.load_op_links = False
- dag = serdag.dag
- if not dag:
- return None
- self._dags[version_id] = dag
- return dag
-
- @staticmethod
- def _version_from_dag_run(dag_run, latest, session):
- if latest or not dag_run.bundle_version:
- dag_version = DagVersion.get_latest_version(dag_id=dag_run.dag_id,
session=session)
- if dag_version:
- return dag_version
-
- return dag_run.created_dag_version
-
- def get_dag(self, dag_run: DagRun, session: Session, latest=False) -> DAG
| None:
- version = self._version_from_dag_run(dag_run=dag_run, latest=latest,
session=session)
- if not version:
- return None
- return self._get_dag(version_id=version.id, session=session)
-
-
def _get_current_dag(dag_id: str, session: Session) -> DAG | None:
serdag = SerializedDagModel.get(dag_id=dag_id, session=session) # grabs
the latest version
if not serdag:
@@ -253,7 +212,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
if log:
self._log = log
- self.scheduler_dag_bag = SchedulerDagBag()
+ self.scheduler_dag_bag = SchedulerDagBag(load_op_links=False)
@provide_session
def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
@@ -544,7 +503,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
if task_instance.dag_model.has_task_concurrency_limits:
# Many dags don't have a task_concurrency, so where we can
avoid loading the full
# serialized DAG the better.
- serialized_dag = self.scheduler_dag_bag.get_dag(
+ serialized_dag = self.scheduler_dag_bag.get_dag_for_run(
dag_run=task_instance.dag_run, session=session
)
# If the dag is missing, fail the task and continue to the
next task.
@@ -905,7 +864,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# Get task from the Serialized DAG
try:
- dag = scheduler_dag_bag.get_dag(dag_run=ti.dag_run,
session=session)
+ dag =
scheduler_dag_bag.get_dag_for_run(dag_run=ti.dag_run, session=session)
cls.logger().error(
"DAG '%s' for task instance %s not found in
serialized_dag table",
ti.dag_id,
@@ -1034,7 +993,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
.group_by(DagRun)
)
for dag_run in paused_runs:
- dag = self.scheduler_dag_bag.get_dag(dag_run=dag_run,
session=session)
+ dag = self.scheduler_dag_bag.get_dag_for_run(dag_run=dag_run,
session=session)
if dag is not None:
dag_run.dag = dag
_, callback_to_run =
dag_run.update_state(execute_callbacks=False, session=session)
@@ -1386,7 +1345,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# Send the callbacks after we commit to ensure the context is up to
date when it gets run
# cache saves time during scheduling of many dag_runs for same dag
cached_get_dag: Callable[[DagRun], DAG | None] = lru_cache()(
- partial(self.scheduler_dag_bag.get_dag, session=session)
+ partial(self.scheduler_dag_bag.get_dag_for_run, session=session)
)
for dag_run, callback_to_run in callback_tuples:
dag = cached_get_dag(dag_run)
@@ -1728,7 +1687,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# cache saves time during scheduling of many dag_runs for same dag
cached_get_dag: Callable[[DagRun], DAG | None] = lru_cache()(
- partial(self.scheduler_dag_bag.get_dag, session=session)
+ partial(self.scheduler_dag_bag.get_dag_for_run, session=session)
)
span = Trace.get_current_span()
@@ -1818,7 +1777,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
callback: DagCallbackRequest | None = None
- dag = dag_run.dag =
self.scheduler_dag_bag.get_dag(dag_run=dag_run, session=session)
+ dag = dag_run.dag =
self.scheduler_dag_bag.get_dag_for_run(dag_run=dag_run, session=session)
dag_model = DM.get_dagmodel(dag_run.dag_id, session)
if not dag or not dag_model:
@@ -1936,7 +1895,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self.log.debug("DAG %s not changed structure, skipping
dagrun.verify_integrity", dag_run.dag_id)
return True
# Refresh the DAG
- dag_run.dag = self.scheduler_dag_bag.get_dag(dag_run=dag_run,
session=session)
+ dag_run.dag = self.scheduler_dag_bag.get_dag_for_run(dag_run=dag_run,
session=session)
if not dag_run.dag:
return False
# Select all TIs in State.unfinished and update the dag_version_id
diff --git a/airflow-core/src/airflow/models/dag.py
b/airflow-core/src/airflow/models/dag.py
index 5c72eb83665..5d68ad226d2 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -1282,11 +1282,11 @@ class DAG(TaskSDKDag, LoggingMixin):
only_running: bool = False,
confirm_prompt: bool = False,
dag_run_state: DagRunState = DagRunState.QUEUED,
- run_on_latest_version: bool = False,
session: Session = NEW_SESSION,
dag_bag: DagBag | None = None,
exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None =
frozenset(),
exclude_run_ids: frozenset[str] | None = frozenset(),
+ run_on_latest_version: bool = False,
) -> list[TaskInstance]: ... # pragma: no cover
@overload
@@ -1300,11 +1300,11 @@ class DAG(TaskSDKDag, LoggingMixin):
confirm_prompt: bool = False,
dag_run_state: DagRunState = DagRunState.QUEUED,
dry_run: Literal[False] = False,
- run_on_latest_version: bool = False,
session: Session = NEW_SESSION,
dag_bag: DagBag | None = None,
exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None =
frozenset(),
exclude_run_ids: frozenset[str] | None = frozenset(),
+ run_on_latest_version: bool = False,
) -> int: ... # pragma: no cover
@overload
@@ -1319,11 +1319,11 @@ class DAG(TaskSDKDag, LoggingMixin):
only_running: bool = False,
confirm_prompt: bool = False,
dag_run_state: DagRunState = DagRunState.QUEUED,
- run_on_latest_version: bool = False,
session: Session = NEW_SESSION,
dag_bag: DagBag | None = None,
exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None =
frozenset(),
exclude_run_ids: frozenset[str] | None = frozenset(),
+ run_on_latest_version: bool = False,
) -> list[TaskInstance]: ... # pragma: no cover
@overload
@@ -1338,11 +1338,11 @@ class DAG(TaskSDKDag, LoggingMixin):
confirm_prompt: bool = False,
dag_run_state: DagRunState = DagRunState.QUEUED,
dry_run: Literal[False] = False,
- run_on_latest_version: bool = False,
session: Session = NEW_SESSION,
dag_bag: DagBag | None = None,
exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None =
frozenset(),
exclude_run_ids: frozenset[str] | None = frozenset(),
+ run_on_latest_version: bool = False,
) -> int: ... # pragma: no cover
@provide_session
@@ -1358,11 +1358,11 @@ class DAG(TaskSDKDag, LoggingMixin):
confirm_prompt: bool = False,
dag_run_state: DagRunState = DagRunState.QUEUED,
dry_run: bool = False,
- run_on_latest_version: bool = False,
session: Session = NEW_SESSION,
dag_bag: DagBag | None = None,
exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None =
frozenset(),
exclude_run_ids: frozenset[str] | None = frozenset(),
+ run_on_latest_version: bool = False,
) -> int | Iterable[TaskInstance]:
"""
Clear a set of task instances associated with the current dag for a
specified date range.
diff --git a/airflow-core/src/airflow/models/dagbag.py
b/airflow-core/src/airflow/models/dagbag.py
index a6ee7403b04..23c6645c53c 100644
--- a/airflow-core/src/airflow/models/dagbag.py
+++ b/airflow-core/src/airflow/models/dagbag.py
@@ -37,6 +37,7 @@ from sqlalchemy import (
Column,
String,
)
+from sqlalchemy.orm import joinedload
from tabulate import tabulate
from airflow import settings
@@ -52,6 +53,7 @@ from airflow.exceptions import (
)
from airflow.listeners.listener import get_listener_manager
from airflow.models.base import Base, StringID
+from airflow.models.dag_version import DagVersion
from airflow.stats import Stats
from airflow.utils.dag_cycle_tester import check_cycle
from airflow.utils.docs import get_docs_url
@@ -71,6 +73,7 @@ if TYPE_CHECKING:
from sqlalchemy.orm import Session
+ from airflow.models import DagRun
from airflow.models.dag import DAG
from airflow.models.dagwarning import DagWarning
from airflow.utils.types import ArgNotSet
@@ -712,6 +715,66 @@ class DagBag(LoggingMixin):
)
+class SchedulerDagBag:
+ """
+ Internal class for retrieving and caching dags in the scheduler.
+
+ :meta private:
+ """
+
+ def __init__(self, load_op_links: bool = True):
+ self._dags: dict[str, DAG] = {} # dag_version_id to dag
+ self.load_op_links = load_op_links
+
+ def _get_dag(self, version_id: str, session: Session) -> DAG | None:
+ if dag := self._dags.get(version_id):
+ return dag
+ dag_version = session.get(DagVersion, version_id,
options=[joinedload(DagVersion.serialized_dag)])
+ if not dag_version:
+ return None
+ serdag = dag_version.serialized_dag
+ if not serdag:
+ return None
+ serdag.load_op_links = self.load_op_links
+ dag = serdag.dag
+ if not dag:
+ return None
+ self._dags[version_id] = dag
+ return dag
+
+ @staticmethod
+ def _version_from_dag_run(dag_run, session):
+ if not dag_run.bundle_version:
+ dag_version = DagVersion.get_latest_version(dag_id=dag_run.dag_id,
session=session)
+ if dag_version:
+ return dag_version
+
+ return dag_run.created_dag_version
+
+ def get_dag_for_run(self, dag_run: DagRun, session: Session) -> DAG | None:
+ version = self._version_from_dag_run(dag_run=dag_run, session=session)
+ if not version:
+ return None
+ return self._get_dag(version_id=version.id, session=session)
+
+ def get_latest_version_of_dag(self, dag_id: str, session: Session) -> DAG
| None:
+ """
+ Get the latest version of a DAG by its ID.
+
+ This method retrieves the latest version of the DAG with the given ID.
+ """
+ from airflow.models.serialized_dag import SerializedDagModel
+
+ serdag = SerializedDagModel.get(dag_id, session=session)
+ if not serdag:
+ return None
+ serdag.load_op_links = self.load_op_links
+ dag = serdag.dag
+
+ self._dags[serdag.dag_version.id] = dag
+ return dag
+
+
def generate_md5_hash(context):
bundle_name = context.get_current_parameters()["bundle_name"]
relative_fileloc = context.get_current_parameters()["relative_fileloc"]
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 9059c6d3986..53ca7c6589e 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -223,9 +223,9 @@ def clear_task_instances(
:meta private:
"""
task_instance_ids: list[str] = []
- from airflow.jobs.scheduler_job_runner import SchedulerDagBag
+ from airflow.models.dagbag import SchedulerDagBag
- scheduler_dagbag = SchedulerDagBag()
+ scheduler_dagbag = SchedulerDagBag(load_op_links=False)
for ti in tis:
task_instance_ids.append(ti.id)
@@ -236,7 +236,10 @@ def clear_task_instances(
ti.state = TaskInstanceState.RESTARTING
else:
dr = ti.dag_run
- ti_dag = scheduler_dagbag.get_dag(dag_run=dr, session=session,
latest=run_on_latest_version)
+ if run_on_latest_version:
+ ti_dag = scheduler_dagbag.get_latest_version_of_dag(ti.dag_id,
session=session)
+ else:
+ ti_dag = scheduler_dagbag.get_dag_for_run(dag_run=dr,
session=session)
if not ti_dag:
log.warning("No serialized dag found for dag '%s'", dr.dag_id)
task_id = ti.task_id
@@ -279,7 +282,10 @@ def clear_task_instances(
if dr.state in State.finished_dr_states:
dr.state = dag_run_state
dr.start_date = timezone.utcnow()
- dr_dag = scheduler_dagbag.get_dag(dag_run=dr, session=session,
latest=run_on_latest_version)
+ if run_on_latest_version:
+ dr_dag =
scheduler_dagbag.get_latest_version_of_dag(dr.dag_id, session=session)
+ else:
+ dr_dag = scheduler_dagbag.get_dag_for_run(dag_run=dr,
session=session)
if not dr_dag:
log.warning("No serialized dag found for dag '%s'",
dr.dag_id)
if dr_dag and not dr_dag.disable_bundle_versioning and
run_on_latest_version:
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 3932344c4c3..87297df4f33 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -1336,6 +1336,15 @@ class SerializedBaseOperator(DAGNode, BaseSerialization):
def expand_start_trigger_args(self, *, context: Context) ->
StartTriggerArgs | None:
return self.start_trigger_args
+ def __getattr__(self, name):
+ # Handle missing attributes with task_type instead of
SerializedBaseOperator
+ # Don't intercept special methods that Python internals might check
+ if name.startswith("__") and name.endswith("__"):
+ # For special methods, raise the original error
+ raise AttributeError(f"'{self.__class__.__name__}' object has no
attribute '{name}'")
+ # For regular attributes, use task_type in the error message
+ raise AttributeError(f"'{self.task_type}' object has no attribute
'{name}'")
+
@classmethod
def serialize_mapped_operator(cls, op: MappedOperator) -> dict[str, Any]:
serialized_op = cls._serialize_node(op)
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index d836de1bfce..4a9683afdc9 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1209,7 +1209,7 @@ export const $ClearTaskInstancesBody = {
run_on_latest_version: {
type: 'boolean',
title: 'Run On Latest Version',
- description: '(Experimental) Run on the latest bundle version of
the DAG after clearing the task instances.',
+ description: '(Experimental) Run on the latest bundle version of
the dag after clearing the task instances.',
default: false
}
},
@@ -2187,7 +2187,7 @@ export const $DAGRunClearBody = {
run_on_latest_version: {
type: 'boolean',
title: 'Run On Latest Version',
- description: '(Experimental) Run on the latest bundle version of
the DAG after clearing the DAG Run.',
+ description: '(Experimental) Run on the latest bundle version of
the Dag after clearing the Dag Run.',
default: false
}
},
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 47d4b65cd89..b777ca39767 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -390,7 +390,7 @@ export type ClearTaskInstancesBody = {
include_future?: boolean;
include_past?: boolean;
/**
- * (Experimental) Run on the latest bundle version of the DAG after
clearing the task instances.
+ * (Experimental) Run on the latest bundle version of the dag after
clearing the task instances.
*/
run_on_latest_version?: boolean;
};
@@ -597,7 +597,7 @@ export type DAGRunClearBody = {
dry_run?: boolean;
only_failed?: boolean;
/**
- * (Experimental) Run on the latest bundle version of the DAG after
clearing the DAG Run.
+ * (Experimental) Run on the latest bundle version of the Dag after
clearing the Dag Run.
*/
run_on_latest_version?: boolean;
};
diff --git a/airflow-core/tests/unit/api_fastapi/common/test_dagbag.py
b/airflow-core/tests/unit/api_fastapi/common/test_dagbag.py
index b0342c29180..be16cd276a9 100644
--- a/airflow-core/tests/unit/api_fastapi/common/test_dagbag.py
+++ b/airflow-core/tests/unit/api_fastapi/common/test_dagbag.py
@@ -48,13 +48,13 @@ class TestDagBagSingleton:
"""Patch DagBag once before app is created, and reset counter."""
self.dagbag_call_counter["count"] = 0
- from airflow.models.dagbag import DagBag as RealDagBag
+ from airflow.models.dagbag import SchedulerDagBag as RealDagBag
def factory(*args, **kwargs):
self.dagbag_call_counter["count"] += 1
return RealDagBag(*args, **kwargs)
- with mock.patch("airflow.api_fastapi.common.dagbag.DagBag",
side_effect=factory):
+ with mock.patch("airflow.api_fastapi.common.dagbag.SchedulerDagBag",
side_effect=factory):
purge_cached_app()
yield
diff --git a/airflow-core/tests/unit/api_fastapi/conftest.py
b/airflow-core/tests/unit/api_fastapi/conftest.py
index 30cef3aa71e..b2497c194a5 100644
--- a/airflow-core/tests/unit/api_fastapi/conftest.py
+++ b/airflow-core/tests/unit/api_fastapi/conftest.py
@@ -170,10 +170,10 @@ def make_dag_with_multiple_versions(dag_maker,
configure_git_connection_for_dag_
@pytest.fixture(scope="module")
def dagbag():
- from airflow.models import DagBag
+ from airflow.models.dagbag import SchedulerDagBag
parse_and_sync_to_db(os.devnull, include_examples=True)
- return DagBag(read_dags_from_db=True)
+ return SchedulerDagBag()
@pytest.fixture
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py
index dd90faad258..6a62ba1021a 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py
@@ -24,7 +24,7 @@ import pytest
from httpx import Response
from sqlalchemy import select
-from airflow.models.dagbag import DagBag
+from airflow.models.dagbag import SchedulerDagBag
from airflow.models.dagcode import DagCode
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.state import DagRunState
@@ -46,9 +46,9 @@ TEST_DAG_DISPLAY_NAME = "example_simplest_dag"
@pytest.fixture
-def test_dag():
+def test_dag(session):
parse_and_sync_to_db(EXAMPLE_DAG_FILE, include_examples=False)
- return DagBag(read_dags_from_db=True).get_dag(TEST_DAG_ID)
+ return SchedulerDagBag().get_latest_version_of_dag(TEST_DAG_ID, session)
class TestGetDAGSource:
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_extra_links.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_extra_links.py
index 780330c0545..a09146e873f 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_extra_links.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_extra_links.py
@@ -16,15 +16,12 @@
# under the License.
from __future__ import annotations
-import os
-
import pytest
from airflow._shared.timezones import timezone
from airflow.api_fastapi.common.dagbag import dag_bag_from_app
from airflow.api_fastapi.core_api.datamodels.extra_links import
ExtraLinkCollectionResponse
-from airflow.dag_processing.bundles.manager import DagBundlesManager
-from airflow.models.dagbag import DagBag
+from airflow.models.dagbag import SchedulerDagBag
from airflow.models.xcom import XComModel as XCom
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.state import DagRunState
@@ -92,14 +89,10 @@ class TestGetExtraLinks:
self.dag = self._create_dag(dag_maker)
- DagBundlesManager().sync_bundles_to_db()
- dag_bag = DagBag(os.devnull, include_examples=False)
- dag_bag.dags = {self.dag.dag_id: self.dag}
-
+ dag_bag = SchedulerDagBag()
test_client.app.dependency_overrides[dag_bag_from_app] = lambda:
dag_bag
- dag_bag.sync_to_db("dags-folder", None)
- self.dag.create_dagrun(
+ dag_maker.create_dagrun(
run_id=self.dag_run_id,
logical_date=self.default_time,
run_type=DagRunType.MANUAL,
@@ -148,7 +141,7 @@ class TestGetExtraLinks:
assert response.status_code == expected_status_code
assert response.json() == expected_response
- def test_should_respond_200(self, dag_maker, test_client):
+ def test_should_respond_200(self, test_client, session):
XCom.set(
key="search_query",
value="TEST_LINK_VALUE",
@@ -163,7 +156,6 @@ class TestGetExtraLinks:
dag_id=self.dag_id,
run_id=self.dag_run_id,
)
-
response = test_client.get(
f"/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_single_link}/links",
)
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py
index 3a4e7c8ae71..a326f7f2915 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py
@@ -32,6 +32,7 @@ from airflow._shared.timezones import timezone
from airflow.api_fastapi.common.dagbag import create_dag_bag, dag_bag_from_app
from airflow.config_templates.airflow_local_settings import
DEFAULT_LOGGING_CONFIG
from airflow.models.dag import DAG
+from airflow.models.serialized_dag import SerializedDagModel
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import task
from airflow.utils.types import DagRunType
@@ -114,8 +115,6 @@ class TestTaskInstancesLog:
session.commit()
dagbag = create_dag_bag()
- dagbag.bag_dag(dag)
- dagbag.bag_dag(dummy_dag)
test_client.app.dependency_overrides[dag_bag_from_app] = lambda: dagbag
@pytest.fixture
@@ -276,7 +275,8 @@ class TestTaskInstancesLog:
# Recreate DAG without tasks
dagbag = create_dag_bag()
dag = DAG(self.DAG_ID, schedule=None,
start_date=timezone.parse(self.default_time))
- dagbag.bag_dag(dag=dag)
+ dag.sync_to_db()
+ SerializedDagModel.write_dag(dag, bundle_name="testing")
self.app.dependency_overrides[dag_bag_from_app] = lambda: dagbag
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index 3a0ba67a50a..3cb85f80a70 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -103,7 +103,7 @@ class TestTaskInstanceEndpoint:
with_ti_history=False,
):
"""Method to create task instances using kwargs and default
arguments"""
- dag = self.dagbag.get_dag(dag_id)
+ dag = self.dagbag.get_latest_version_of_dag(dag_id, session)
tasks = dag.tasks
counter = len(tasks)
if task_instances is not None:
@@ -1164,7 +1164,7 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint):
def test_not_found(self, test_client):
response = test_client.get("/dags/invalid/dagRuns/~/taskInstances")
assert response.status_code == 404
- assert response.json() == {"detail": "DAG with dag_id: `invalid` was
not found"}
+ assert response.json() == {"detail": "Dag with dag_id: `invalid` was
not found"}
response = test_client.get("/dags/~/dagRuns/invalid/taskInstances")
assert response.status_code == 404
@@ -2223,7 +2223,6 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
task_instances=task_instances,
update_extras=False,
)
- self.dagbag.sync_to_db("dags-folder", None)
response = test_client.post(
f"/dags/{request_dag}/clearTaskInstances",
json=payload,
@@ -2249,7 +2248,6 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
update_extras=False,
dag_run_state=State.FAILED,
)
- self.dagbag.sync_to_db("dags-folder", None)
response = test_client.post(f"/dags/{dag_id}/clearTaskInstances",
json=payload)
assert response.status_code == 400
assert (
@@ -2317,7 +2315,6 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
task_instances=task_instances,
update_extras=False,
)
- self.dagbag.sync_to_db("dags-folder", None)
response = test_client.post(
f"/dags/{request_dag}/clearTaskInstances",
json=payload,
@@ -2330,7 +2327,6 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
self.create_task_instances(session)
dag_id = "example_python_operator"
payload = {"reset_dag_runs": True, "dry_run": False}
- self.dagbag.sync_to_db("dags-folder", None)
response = test_client.post(
f"/dags/{dag_id}/clearTaskInstances",
json=payload,
@@ -2349,7 +2345,6 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
assert dagrun.state == "running"
payload = {"dry_run": False, "reset_dag_runs": True, "task_ids": [""]}
- self.dagbag.sync_to_db("dags-folder", None)
response = test_client.post(
f"/dags/{dag_id}/clearTaskInstances",
json=payload,
@@ -2399,7 +2394,6 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
update_extras=False,
dag_run_state=DagRunState.FAILED,
)
- self.dagbag.sync_to_db("dags-folder", None)
response = test_client.post(
f"/dags/{dag_id}/clearTaskInstances",
json=payload,
@@ -2484,7 +2478,6 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
update_extras=False,
dag_run_state=State.FAILED,
)
- self.dagbag.sync_to_db("dags-folder", None)
response = test_client.post(
f"/dags/{dag_id}/clearTaskInstances",
json=payload,
@@ -2570,7 +2563,6 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
update_extras=False,
dag_run_state=State.FAILED,
)
- self.dagbag.sync_to_db("dags-folder", None)
response = test_client.post(
f"/dags/{dag_id}/clearTaskInstances",
json=payload,
@@ -2654,7 +2646,6 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
update_extras=False,
dag_run_state=State.FAILED,
)
- self.dagbag.sync_to_db("dags-folder", None)
response = test_client.post(
f"/dags/{dag_id}/clearTaskInstances",
json=payload,
@@ -2804,7 +2795,6 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
task_instances=task_instances,
update_extras=False,
)
- self.dagbag.sync_to_db("dags-folder", None)
response = test_client.post(
"/dags/example_python_operator/clearTaskInstances",
json=payload,
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_tasks.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_tasks.py
index 80dd29f6e65..8a12530264a 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_tasks.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_tasks.py
@@ -16,14 +16,13 @@
# under the License.
from __future__ import annotations
-import os
from datetime import datetime
import pytest
from airflow.api_fastapi.common.dagbag import dag_bag_from_app
from airflow.models.dag import DAG
-from airflow.models.dagbag import DagBag
+from airflow.models.dagbag import SchedulerDagBag
from airflow.models.serialized_dag import SerializedDagModel
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.definitions._internal.expandinput import EXPAND_INPUT_EMPTY
@@ -64,12 +63,14 @@ class TestTaskEndpoint:
task1 >> task2
task4 >> task5
- dag_bag = DagBag(os.devnull, include_examples=False)
- dag_bag.dags = {
- dag.dag_id: dag,
- mapped_dag.dag_id: mapped_dag,
- unscheduled_dag.dag_id: unscheduled_dag,
- }
+ dag.sync_to_db()
+ SerializedDagModel.write_dag(dag, bundle_name="testing")
+ mapped_dag.sync_to_db()
+ SerializedDagModel.write_dag(mapped_dag, bundle_name="testing")
+ unscheduled_dag.sync_to_db()
+ SerializedDagModel.write_dag(unscheduled_dag, bundle_name="testing")
+ dag_bag = SchedulerDagBag()
+
test_client.app.dependency_overrides[dag_bag_from_app] = lambda:
dag_bag
@staticmethod
@@ -239,7 +240,7 @@ class TestGetTask(TestTaskEndpoint):
dag.sync_to_db()
SerializedDagModel.write_dag(dag, bundle_name="test_bundle")
- dag_bag = DagBag(os.devnull, include_examples=False,
read_dags_from_db=True)
+ dag_bag = SchedulerDagBag()
test_client.app.dependency_overrides[dag_bag_from_app] = lambda:
dag_bag
expected = {
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
index 1c4194c178b..da985795f52 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
@@ -533,7 +533,7 @@ class TestCreateXComEntry(TestXComEndpoint):
run_id,
XComCreateBody(key=TEST_XCOM_KEY, value=TEST_XCOM_VALUE),
404,
- f"Task with ID: `invalid-task-id` not found in DAG:
`{TEST_DAG_ID}`",
+ f"Task with ID: `invalid-task-id` not found in dag:
`{TEST_DAG_ID}`",
id="task-not-found",
),
# Test case: DAG Run not found
@@ -543,7 +543,7 @@ class TestCreateXComEntry(TestXComEndpoint):
"invalid-dag-run-id",
XComCreateBody(key=TEST_XCOM_KEY, value=TEST_XCOM_VALUE),
404,
- f"DAG Run with ID: `invalid-dag-run-id` not found for DAG:
`{TEST_DAG_ID}`",
+ f"Dag Run with ID: `invalid-dag-run-id` not found for dag:
`{TEST_DAG_ID}`",
id="dag-run-not-found",
),
# Test case: XCom entry already exists
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py
index 661fd93157e..fd71a2f7d60 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py
@@ -23,7 +23,7 @@ import pytest
from airflow._shared.timezones import timezone
from airflow.api_fastapi.common.dagbag import dag_bag_from_app
-from airflow.jobs.scheduler_job_runner import SchedulerDagBag
+from airflow.models.dagbag import SchedulerDagBag
from airflow.utils.state import State
from tests_common.test_utils.db import clear_db_assets, clear_db_runs
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 469845e229a..1f12fdc08c1 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -460,7 +460,7 @@ class TestSchedulerJob:
scheduler_job = Job(executor=executor)
self.job_runner = SchedulerJobRunner(scheduler_job)
self.job_runner.scheduler_dag_bag = mock.MagicMock()
- self.job_runner.scheduler_dag_bag.get_dag.side_effect =
Exception("failed")
+ self.job_runner.scheduler_dag_bag.get_dag_for_run.side_effect =
Exception("failed")
session = settings.Session()
@@ -976,7 +976,7 @@ class TestSchedulerJob:
self.job_runner = SchedulerJobRunner(job=scheduler_job)
self.job_runner.scheduler_dag_bag = mock.MagicMock()
- self.job_runner.scheduler_dag_bag.get_dag.return_value = None
+ self.job_runner.scheduler_dag_bag.get_dag_for_run.return_value = None
dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
@@ -3472,7 +3472,7 @@ class TestSchedulerJob:
dr = drs[0]
self.job_runner._schedule_dag_run(dag_run=dr, session=session)
- len(self.job_runner.scheduler_dag_bag.get_dag(dr, session).tasks) == 1
+ len(self.job_runner.scheduler_dag_bag.get_dag_for_run(dr,
session).tasks) == 1
dag_version_1 = DagVersion.get_latest_version(dr.dag_id,
session=session)
assert dr.dag_versions[-1].id == dag_version_1.id
@@ -3490,7 +3490,7 @@ class TestSchedulerJob:
assert len(drs) == 1
dr = drs[0]
assert dr.dag_versions[-1].id == dag_version_2.id
- assert len(self.job_runner.scheduler_dag_bag.get_dag(dr,
session).tasks) == 2
+ assert len(self.job_runner.scheduler_dag_bag.get_dag_for_run(dr,
session).tasks) == 2
if SQLALCHEMY_V_1_4:
tis_count = (
@@ -4335,7 +4335,7 @@ class TestSchedulerJob:
# assert len(self.job_runner.scheduler_dag_bag._dags) == 1 # sanity
check
# Get serialized dag
dr = DagRun.find(dag_id=dag.dag_id)[0]
- s_dag_2 = self.job_runner.scheduler_dag_bag.get_dag(dr,
session=session)
+ s_dag_2 = self.job_runner.scheduler_dag_bag.get_dag_for_run(dr,
session=session)
custom_task = s_dag_2.task_dict["custom_task"]
# Test that custom_task has no Operator Links (after de-serialization)
in the Scheduling Loop
assert not custom_task.operator_extra_links
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 0910c25a074..f3bb4d3d292 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -204,7 +204,7 @@ class ClearTaskInstancesBody(BaseModel):
run_on_latest_version: Annotated[
bool | None,
Field(
- description="(Experimental) Run on the latest bundle version of
the DAG after clearing the task instances.",
+ description="(Experimental) Run on the latest bundle version of
the dag after clearing the task instances.",
title="Run On Latest Version",
),
] = False
@@ -318,7 +318,7 @@ class DAGRunClearBody(BaseModel):
run_on_latest_version: Annotated[
bool | None,
Field(
- description="(Experimental) Run on the latest bundle version of
the DAG after clearing the DAG Run.",
+ description="(Experimental) Run on the latest bundle version of
the Dag after clearing the Dag Run.",
title="Run On Latest Version",
),
] = False