This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 94517f8a62b fix(scheduler): Eager-load DagRun asset relationships
before creating DagRunContext (#59714)
94517f8a62b is described below
commit 94517f8a62ba86ce0433a1a8d40f62aadb90c2b0
Author: Wei Lee <[email protected]>
AuthorDate: Tue Dec 23 09:50:17 2025 +0800
fix(scheduler): Eager-load DagRun asset relationships before creating
DagRunContext (#59714)
---
.../src/airflow/jobs/scheduler_job_runner.py | 8 +++
airflow-core/tests/unit/jobs/test_scheduler_job.py | 78 ++++++++++++++++++++++
2 files changed, 86 insertions(+)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 9f85ac7cc11..cac8a6d769a 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2178,6 +2178,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
):
dag_model.calculate_dagrun_date_fields(dag,
get_run_data_interval(dag.timetable, dag_run))
+ dag_run = session.scalar(
+ select(DagRun)
+ .where(DagRun.id == dag_run.id)
+ .options(
+
selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.asset),
+
selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.source_aliases),
+ )
+ )
callback_to_execute = DagCallbackRequest(
filepath=dag_model.relative_fileloc or "",
dag_id=dag.dag_id,
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 31b3a3f2c85..b088c4c30f7 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -25,6 +25,7 @@ from collections import Counter, deque
from collections.abc import Generator
from datetime import timedelta
from pathlib import Path
+from typing import TYPE_CHECKING
from unittest import mock
from unittest.mock import MagicMock, PropertyMock, patch
from uuid import uuid4
@@ -116,6 +117,10 @@ from unit.listeners import dag_listener
from unit.listeners.test_listeners import get_listener_manager
from unit.models import TEST_DAGS_FOLDER
+if TYPE_CHECKING:
+ from sqlalchemy.orm.session import Session
+
+ from tests_common.pytest_plugin import DagMaker
pytestmark = pytest.mark.db_test
PERF_DAGS_FOLDER = AIRFLOW_ROOT_PATH / "dev" / "airflow_perf" / "dags"
@@ -686,6 +691,79 @@ class TestSchedulerJob:
assert
len(callback_request.context_from_server.dag_run.consumed_asset_events) == 1
assert
callback_request.context_from_server.dag_run.consumed_asset_events[0].asset.uri
== asset1.uri
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_schedule_dag_run_with_asset_event(self, session: Session,
dag_maker: DagMaker):
+ """
+ Verify that scheduler can build DagRunContext for a timed-out Dag run
+ with consumed asset events without raising DetachedInstanceError.
+ """
+ asset1 = Asset(uri="test://asset1", name="test_asset_executor",
group="test_group")
+ asset_model = AssetModel(name=asset1.name, uri=asset1.uri,
group=asset1.group)
+ session.add(asset_model)
+ session.flush()
+
+ with dag_maker(
+ dag_id="test_executor_events_with_assets",
+ schedule=[asset1],
+ fileloc="/test_path1/",
+ dagrun_timeout=timedelta(minutes=1),
+ ):
+ EmptyOperator(task_id="dummy_task")
+
+ dag = dag_maker.dag
+ sync_dag_to_db(dag)
+ DagVersion.get_latest_version(dag.dag_id)
+
+ # Create Dag run that is guaranteed to time out
+ dr = dag_maker.create_dagrun(
+ start_date=timezone.utcnow() - timedelta(days=1),
+ state=DagRunState.RUNNING,
+ )
+
+ # Create asset event and attach to dag run
+ asset_event = AssetEvent(
+ asset_id=asset_model.id,
+ source_task_id="upstream_task",
+ source_dag_id="upstream_dag",
+ source_run_id="upstream_run",
+ source_map_index=-1,
+ )
+ session.add(asset_event)
+ session.flush()
+ dr.consumed_asset_events.append(asset_event)
+ session.add(dr)
+ session.flush()
+
+ executor = MockExecutor(do_update=False)
+ scheduler_job = Job(executor=executor)
+ self.job_runner = SchedulerJobRunner(scheduler_job)
+
+ ti1 = dr.get_task_instance("dummy_task")
+ if TYPE_CHECKING:
+ assert isinstance(ti1, TaskInstance)
+ ti1.state = State.FAILED
+ session.merge(ti1)
+ session.commit()
+
+ executor.event_buffer[ti1.key] = State.FAILED, None
+
+ callback = self.job_runner._schedule_dag_run(dr, session)
+ session.flush()
+
+ assert callback is not None
+ assert callback.is_failure_callback
+ assert callback.msg == "timed_out"
+
+ context = callback.context_from_server
+ assert context is not None
+
+ if TYPE_CHECKING:
+ assert isinstance(context.dag_run, DagRun)
+ events = context.dag_run.consumed_asset_events
+ assert len(events) == 1
+ assert events[0].asset is not None
+ assert events[0].source_aliases is not None
+
def test_execute_task_instances_is_paused_wont_execute(self, session,
dag_maker):
dag_id =
"SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute"
task_id_1 = "dummy_task"