This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 8f644c87371 [v3-1-test] Fix asset scheduling for stale DAGs (#59337) 
(#60022) (#61106)
8f644c87371 is described below

commit 8f644c873711e5b796534ad44737d411b030cbc8
Author: Wei Lee <[email protected]>
AuthorDate: Wed Jan 28 12:26:59 2026 +0800

    [v3-1-test] Fix asset scheduling for stale DAGs (#59337) (#60022) (#61106)
    
    Co-authored-by: Jason K Hall <[email protected]>
    Co-authored-by: Wei Lee <[email protected]>
    Fix asset scheduling for stale DAGs (#59337) (#60022)
---
 airflow-core/src/airflow/assets/manager.py         |  9 +++--
 airflow-core/tests/unit/assets/test_manager.py     | 40 +++++++++++++++++++++-
 airflow-core/tests/unit/jobs/test_scheduler_job.py |  6 +++-
 3 files changed, 48 insertions(+), 7 deletions(-)

diff --git a/airflow-core/src/airflow/assets/manager.py 
b/airflow-core/src/airflow/assets/manager.py
index c0dce3861ec..17d03cb2758 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -164,9 +164,7 @@ class AssetManager(LoggingMixin):
         session.add(asset_event)
         session.flush()  # Ensure the event is written earlier than DDRQ 
entries below.
 
-        dags_to_queue_from_asset = {
-            ref.dag for ref in asset_model.scheduled_dags if not 
ref.dag.is_stale and not ref.dag.is_paused
-        }
+        dags_to_queue_from_asset = {ref.dag for ref in 
asset_model.scheduled_dags if not ref.dag.is_paused}
 
         dags_to_queue_from_asset_alias = set()
         if source_alias_names:
@@ -185,7 +183,7 @@ class AssetManager(LoggingMixin):
                 dags_to_queue_from_asset_alias |= {
                     alias_ref.dag
                     for alias_ref in asset_alias_model.scheduled_dags
-                    if not alias_ref.dag.is_stale and not 
alias_ref.dag.is_paused
+                    if not alias_ref.dag.is_paused
                 }
 
         dags_to_queue_from_asset_ref = set(
@@ -197,7 +195,8 @@ class AssetManager(LoggingMixin):
                     or_(
                         DagScheduleAssetNameReference.name == asset.name,
                         DagScheduleAssetUriReference.uri == asset.uri,
-                    )
+                    ),
+                    DagModel.is_paused.is_(False),
                 )
             )
         )
diff --git a/airflow-core/tests/unit/assets/test_manager.py 
b/airflow-core/tests/unit/assets/test_manager.py
index 94b63e72501..330522cbda1 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -21,7 +21,7 @@ import itertools
 from unittest import mock
 
 import pytest
-from sqlalchemy import delete
+from sqlalchemy import delete, func, select
 from sqlalchemy.orm import Session
 
 from airflow.assets.manager import AssetManager
@@ -204,3 +204,41 @@ class TestAssetManager:
         assert len(asset_listener.created) == 1
         assert len(asms) == 1
         assert asset_listener.created[0].uri == asset.uri == asms[0].uri
+
+    @pytest.mark.usefixtures("testing_dag_bundle")
+    def test_register_asset_change_queues_stale_dag(self, session, 
mock_task_instance):
+        asset_manager = AssetManager()
+        asset_listener.clear()
+        bundle_name = "testing"
+
+        # Setup an Asset
+        asset_uri = "test://stale_asset/"
+        asset_name = "test_stale_asset"
+        asset_definition = Asset(uri=asset_uri, name=asset_name)
+
+        asm = AssetModel(uri=asset_uri, name=asset_name, group="asset")
+        session.add(asm)
+
+        # Setup a Dag that is STALE but NOT PAUSED
+        # We want stale Dags to still receive asset updates
+        stale_dag = DagModel(dag_id="stale_dag", is_stale=True, 
is_paused=False, bundle_name=bundle_name)
+        session.add(stale_dag)
+
+        # Link the Stale Dag to the Asset
+        asm.scheduled_dags = 
[DagScheduleAssetReference(dag_id=stale_dag.dag_id)]
+
+        session.execute(delete(AssetDagRunQueue))
+        session.flush()
+
+        # Register the asset change
+        asset_manager.register_asset_change(
+            task_instance=mock_task_instance, asset=asset_definition, 
session=session
+        )
+        session.flush()
+
+        # Verify the stale Dag was NOT ignored
+        assert 
session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 1
+
+        queued_id = session.scalar(select(AssetDagRunQueue.target_dag_id))
+        assert queued_id == "stale_dag"
+        asset_listener.clear()
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 10f58785e14..aba109f9d41 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -4626,7 +4626,10 @@ class TestSchedulerJob:
         )
         session.flush()
         assert session.scalars(ase_q).one().source_run_id == dr1.run_id
-        assert session.scalars(adrq_q).one_or_none() is None
+        if "is_stale" in disable:
+            assert session.scalars(adrq_q).one_or_none() is not None
+        else:
+            assert session.scalars(adrq_q).one_or_none() is None
 
         # Simulate the consumer DAG being enabled.
         session.execute(update(DagModel).where(DagModel.dag_id == 
"consumer").values(**enable))
@@ -4640,6 +4643,7 @@ class TestSchedulerJob:
         )
         session.flush()
         assert [e.source_run_id for e in session.scalars(ase_q)] == 
[dr1.run_id, dr2.run_id]
+        assert len(session.scalars(adrq_q).all()) == 1
         assert session.scalars(adrq_q).one().target_dag_id == "consumer"
 
     @time_machine.travel(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9), 
tick=False)

Reply via email to