This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 94517f8a62b fix(scheduler): Eager-load DagRun asset relationships 
before creating DagRunContext (#59714)
94517f8a62b is described below

commit 94517f8a62ba86ce0433a1a8d40f62aadb90c2b0
Author: Wei Lee <[email protected]>
AuthorDate: Tue Dec 23 09:50:17 2025 +0800

    fix(scheduler): Eager-load DagRun asset relationships before creating 
DagRunContext (#59714)
---
 .../src/airflow/jobs/scheduler_job_runner.py       |  8 +++
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 78 ++++++++++++++++++++++
 2 files changed, 86 insertions(+)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 9f85ac7cc11..cac8a6d769a 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2178,6 +2178,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 ):
                     dag_model.calculate_dagrun_date_fields(dag, 
get_run_data_interval(dag.timetable, dag_run))
 
+                dag_run = session.scalar(
+                    select(DagRun)
+                    .where(DagRun.id == dag_run.id)
+                    .options(
+                        
selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.asset),
+                        
selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.source_aliases),
+                    )
+                )
                 callback_to_execute = DagCallbackRequest(
                     filepath=dag_model.relative_fileloc or "",
                     dag_id=dag.dag_id,
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 31b3a3f2c85..b088c4c30f7 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -25,6 +25,7 @@ from collections import Counter, deque
 from collections.abc import Generator
 from datetime import timedelta
 from pathlib import Path
+from typing import TYPE_CHECKING
 from unittest import mock
 from unittest.mock import MagicMock, PropertyMock, patch
 from uuid import uuid4
@@ -116,6 +117,10 @@ from unit.listeners import dag_listener
 from unit.listeners.test_listeners import get_listener_manager
 from unit.models import TEST_DAGS_FOLDER
 
+if TYPE_CHECKING:
+    from sqlalchemy.orm.session import Session
+
+    from tests_common.pytest_plugin import DagMaker
 pytestmark = pytest.mark.db_test
 
 PERF_DAGS_FOLDER = AIRFLOW_ROOT_PATH / "dev" / "airflow_perf" / "dags"
@@ -686,6 +691,79 @@ class TestSchedulerJob:
         assert 
len(callback_request.context_from_server.dag_run.consumed_asset_events) == 1
         assert 
callback_request.context_from_server.dag_run.consumed_asset_events[0].asset.uri 
== asset1.uri
 
+    @pytest.mark.usefixtures("testing_dag_bundle")
+    def test_schedule_dag_run_with_asset_event(self, session: Session, 
dag_maker: DagMaker):
+        """
+        Verify that scheduler can build DagRunContext for a timed-out Dag run
+        with consumed asset events without raising DetachedInstanceError.
+        """
+        asset1 = Asset(uri="test://asset1", name="test_asset_executor", 
group="test_group")
+        asset_model = AssetModel(name=asset1.name, uri=asset1.uri, 
group=asset1.group)
+        session.add(asset_model)
+        session.flush()
+
+        with dag_maker(
+            dag_id="test_executor_events_with_assets",
+            schedule=[asset1],
+            fileloc="/test_path1/",
+            dagrun_timeout=timedelta(minutes=1),
+        ):
+            EmptyOperator(task_id="dummy_task")
+
+        dag = dag_maker.dag
+        sync_dag_to_db(dag)
+        DagVersion.get_latest_version(dag.dag_id)
+
+        # Create Dag run that is guaranteed to time out
+        dr = dag_maker.create_dagrun(
+            start_date=timezone.utcnow() - timedelta(days=1),
+            state=DagRunState.RUNNING,
+        )
+
+        # Create asset event and attach to dag run
+        asset_event = AssetEvent(
+            asset_id=asset_model.id,
+            source_task_id="upstream_task",
+            source_dag_id="upstream_dag",
+            source_run_id="upstream_run",
+            source_map_index=-1,
+        )
+        session.add(asset_event)
+        session.flush()
+        dr.consumed_asset_events.append(asset_event)
+        session.add(dr)
+        session.flush()
+
+        executor = MockExecutor(do_update=False)
+        scheduler_job = Job(executor=executor)
+        self.job_runner = SchedulerJobRunner(scheduler_job)
+
+        ti1 = dr.get_task_instance("dummy_task")
+        if TYPE_CHECKING:
+            assert isinstance(ti1, TaskInstance)
+        ti1.state = State.FAILED
+        session.merge(ti1)
+        session.commit()
+
+        executor.event_buffer[ti1.key] = State.FAILED, None
+
+        callback = self.job_runner._schedule_dag_run(dr, session)
+        session.flush()
+
+        assert callback is not None
+        assert callback.is_failure_callback
+        assert callback.msg == "timed_out"
+
+        context = callback.context_from_server
+        assert context is not None
+
+        if TYPE_CHECKING:
+            assert isinstance(context.dag_run, DagRun)
+        events = context.dag_run.consumed_asset_events
+        assert len(events) == 1
+        assert events[0].asset is not None
+        assert events[0].source_aliases is not None
+
     def test_execute_task_instances_is_paused_wont_execute(self, session, 
dag_maker):
         dag_id = 
"SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute"
         task_id_1 = "dummy_task"

Reply via email to