This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 73451c57ec8 Fix asset scheduling for stale DAGs (#59337) (#60022)
73451c57ec8 is described below
commit 73451c57ec86049000cf2bbb19c59f59f6bb5f8c
Author: Jason K Hall <[email protected]>
AuthorDate: Mon Jan 26 18:15:32 2026 -0700
Fix asset scheduling for stale DAGs (#59337) (#60022)
Co-authored-by: Wei Lee <[email protected]>
---
airflow-core/src/airflow/assets/manager.py | 9 +++---
airflow-core/tests/unit/assets/test_manager.py | 36 ++++++++++++++++++++++
airflow-core/tests/unit/jobs/test_scheduler_job.py | 6 +++-
3 files changed, 45 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/assets/manager.py
b/airflow-core/src/airflow/assets/manager.py
index 7a641be006b..b32bcdeac7b 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -230,9 +230,7 @@ class AssetManager(LoggingMixin):
session.add(asset_event)
session.flush() # Ensure the event is written earlier than ADRQ
entries below.
- dags_to_queue_from_asset = {
- ref.dag for ref in asset_model.scheduled_dags if not
ref.dag.is_stale and not ref.dag.is_paused
- }
+ dags_to_queue_from_asset = {ref.dag for ref in
asset_model.scheduled_dags if not ref.dag.is_paused}
dags_to_queue_from_asset_alias = set()
if source_alias_names:
@@ -251,7 +249,7 @@ class AssetManager(LoggingMixin):
dags_to_queue_from_asset_alias |= {
alias_ref.dag
for alias_ref in asset_alias_model.scheduled_dags
- if not alias_ref.dag.is_stale and not
alias_ref.dag.is_paused
+ if not alias_ref.dag.is_paused
}
dags_to_queue_from_asset_ref = set(
@@ -263,7 +261,8 @@ class AssetManager(LoggingMixin):
or_(
DagScheduleAssetNameReference.name == asset.name,
DagScheduleAssetUriReference.uri == asset.uri,
- )
+ ),
+ DagModel.is_paused.is_(False),
)
)
)
diff --git a/airflow-core/tests/unit/assets/test_manager.py
b/airflow-core/tests/unit/assets/test_manager.py
index df6608607c8..a3402e8e0ec 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -259,3 +259,39 @@ class TestAssetManager:
assert len(set(ids)) == 1
assert
session.scalar(select(func.count()).select_from(AssetPartitionDagRun)) == 1
+
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_register_asset_change_queues_stale_dag(self, session,
mock_task_instance):
+ asset_manager = AssetManager()
+ bundle_name = "testing"
+
+ # Setup an Asset
+ asset_uri = "test://stale_asset/"
+ asset_name = "test_stale_asset"
+ asset_definition = Asset(uri=asset_uri, name=asset_name)
+
+ asm = AssetModel(uri=asset_uri, name=asset_name, group="asset")
+ session.add(asm)
+
+ # Setup a Dag that is STALE but NOT PAUSED
+ # We want stale Dags to still receive asset updates
+ stale_dag = DagModel(dag_id="stale_dag", is_stale=True,
is_paused=False, bundle_name=bundle_name)
+ session.add(stale_dag)
+
+ # Link the Stale Dag to the Asset
+ asm.scheduled_dags =
[DagScheduleAssetReference(dag_id=stale_dag.dag_id)]
+
+ session.execute(delete(AssetDagRunQueue))
+ session.flush()
+
+ # Register the asset change
+ asset_manager.register_asset_change(
+ task_instance=mock_task_instance, asset=asset_definition,
session=session
+ )
+ session.flush()
+
+ # Verify the stale Dag was NOT ignored
+ assert
session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 1
+
+ queued_id = session.scalar(select(AssetDagRunQueue.target_dag_id))
+ assert queued_id == "stale_dag"
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 0cf22fdf98f..28ea1d41851 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -4654,7 +4654,10 @@ class TestSchedulerJob:
)
session.flush()
assert session.scalars(ase_q).one().source_run_id == dr1.run_id
- assert session.scalars(adrq_q).one_or_none() is None
+ if "is_stale" in disable:
+ assert session.scalars(adrq_q).one_or_none() is not None
+ else:
+ assert session.scalars(adrq_q).one_or_none() is None
# Simulate the consumer DAG being enabled.
session.execute(update(DagModel).where(DagModel.dag_id ==
"consumer").values(**enable))
@@ -4668,6 +4671,7 @@ class TestSchedulerJob:
)
session.flush()
assert [e.source_run_id for e in session.scalars(ase_q)] ==
[dr1.run_id, dr2.run_id]
+ assert len(session.scalars(adrq_q).all()) == 1
assert session.scalars(adrq_q).one().target_dag_id == "consumer"
@time_machine.travel(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9),
tick=False)