This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 73451c57ec8 Fix asset scheduling for stale DAGs (#59337) (#60022)
73451c57ec8 is described below

commit 73451c57ec86049000cf2bbb19c59f59f6bb5f8c
Author: Jason K Hall <[email protected]>
AuthorDate: Mon Jan 26 18:15:32 2026 -0700

    Fix asset scheduling for stale DAGs (#59337) (#60022)
    
    Co-authored-by: Wei Lee <[email protected]>
---
 airflow-core/src/airflow/assets/manager.py         |  9 +++---
 airflow-core/tests/unit/assets/test_manager.py     | 36 ++++++++++++++++++++++
 airflow-core/tests/unit/jobs/test_scheduler_job.py |  6 +++-
 3 files changed, 45 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/assets/manager.py 
b/airflow-core/src/airflow/assets/manager.py
index 7a641be006b..b32bcdeac7b 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -230,9 +230,7 @@ class AssetManager(LoggingMixin):
         session.add(asset_event)
         session.flush()  # Ensure the event is written earlier than ADRQ 
entries below.
 
-        dags_to_queue_from_asset = {
-            ref.dag for ref in asset_model.scheduled_dags if not 
ref.dag.is_stale and not ref.dag.is_paused
-        }
+        dags_to_queue_from_asset = {ref.dag for ref in 
asset_model.scheduled_dags if not ref.dag.is_paused}
 
         dags_to_queue_from_asset_alias = set()
         if source_alias_names:
@@ -251,7 +249,7 @@ class AssetManager(LoggingMixin):
                 dags_to_queue_from_asset_alias |= {
                     alias_ref.dag
                     for alias_ref in asset_alias_model.scheduled_dags
-                    if not alias_ref.dag.is_stale and not 
alias_ref.dag.is_paused
+                    if not alias_ref.dag.is_paused
                 }
 
         dags_to_queue_from_asset_ref = set(
@@ -263,7 +261,8 @@ class AssetManager(LoggingMixin):
                     or_(
                         DagScheduleAssetNameReference.name == asset.name,
                         DagScheduleAssetUriReference.uri == asset.uri,
-                    )
+                    ),
+                    DagModel.is_paused.is_(False),
                 )
             )
         )
diff --git a/airflow-core/tests/unit/assets/test_manager.py 
b/airflow-core/tests/unit/assets/test_manager.py
index df6608607c8..a3402e8e0ec 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -259,3 +259,39 @@ class TestAssetManager:
 
         assert len(set(ids)) == 1
         assert 
session.scalar(select(func.count()).select_from(AssetPartitionDagRun)) == 1
+
+    @pytest.mark.usefixtures("testing_dag_bundle")
+    def test_register_asset_change_queues_stale_dag(self, session, 
mock_task_instance):
+        asset_manager = AssetManager()
+        bundle_name = "testing"
+
+        # Setup an Asset
+        asset_uri = "test://stale_asset/"
+        asset_name = "test_stale_asset"
+        asset_definition = Asset(uri=asset_uri, name=asset_name)
+
+        asm = AssetModel(uri=asset_uri, name=asset_name, group="asset")
+        session.add(asm)
+
+        # Setup a Dag that is STALE but NOT PAUSED
+        # We want stale Dags to still receive asset updates
+        stale_dag = DagModel(dag_id="stale_dag", is_stale=True, 
is_paused=False, bundle_name=bundle_name)
+        session.add(stale_dag)
+
+        # Link the Stale Dag to the Asset
+        asm.scheduled_dags = 
[DagScheduleAssetReference(dag_id=stale_dag.dag_id)]
+
+        session.execute(delete(AssetDagRunQueue))
+        session.flush()
+
+        # Register the asset change
+        asset_manager.register_asset_change(
+            task_instance=mock_task_instance, asset=asset_definition, 
session=session
+        )
+        session.flush()
+
+        # Verify the stale Dag was NOT ignored
+        assert 
session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 1
+
+        queued_id = session.scalar(select(AssetDagRunQueue.target_dag_id))
+        assert queued_id == "stale_dag"
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 0cf22fdf98f..28ea1d41851 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -4654,7 +4654,10 @@ class TestSchedulerJob:
         )
         session.flush()
         assert session.scalars(ase_q).one().source_run_id == dr1.run_id
-        assert session.scalars(adrq_q).one_or_none() is None
+        if "is_stale" in disable:
+            assert session.scalars(adrq_q).one_or_none() is not None
+        else:
+            assert session.scalars(adrq_q).one_or_none() is None
 
         # Simulate the consumer DAG being enabled.
         session.execute(update(DagModel).where(DagModel.dag_id == 
"consumer").values(**enable))
@@ -4668,6 +4671,7 @@ class TestSchedulerJob:
         )
         session.flush()
         assert [e.source_run_id for e in session.scalars(ase_q)] == 
[dr1.run_id, dr2.run_id]
+        assert len(session.scalars(adrq_q).all()) == 1
         assert session.scalars(adrq_q).one().target_dag_id == "consumer"
 
     @time_machine.travel(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9), 
tick=False)

Reply via email to