This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 8f644c87371 [v3-1-test] Fix asset scheduling for stale DAGs (#59337)
(#60022) (#61106)
8f644c87371 is described below
commit 8f644c873711e5b796534ad44737d411b030cbc8
Author: Wei Lee <[email protected]>
AuthorDate: Wed Jan 28 12:26:59 2026 +0800
[v3-1-test] Fix asset scheduling for stale DAGs (#59337) (#60022) (#61106)
Co-authored-by: Jason K Hall <[email protected]>
Co-authored-by: Wei Lee <[email protected]>
Fix asset scheduling for stale DAGs (#59337) (#60022)
---
airflow-core/src/airflow/assets/manager.py | 9 +++--
airflow-core/tests/unit/assets/test_manager.py | 40 +++++++++++++++++++++-
airflow-core/tests/unit/jobs/test_scheduler_job.py | 6 +++-
3 files changed, 48 insertions(+), 7 deletions(-)
diff --git a/airflow-core/src/airflow/assets/manager.py
b/airflow-core/src/airflow/assets/manager.py
index c0dce3861ec..17d03cb2758 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -164,9 +164,7 @@ class AssetManager(LoggingMixin):
session.add(asset_event)
session.flush() # Ensure the event is written earlier than DDRQ
entries below.
- dags_to_queue_from_asset = {
- ref.dag for ref in asset_model.scheduled_dags if not
ref.dag.is_stale and not ref.dag.is_paused
- }
+ dags_to_queue_from_asset = {ref.dag for ref in
asset_model.scheduled_dags if not ref.dag.is_paused}
dags_to_queue_from_asset_alias = set()
if source_alias_names:
@@ -185,7 +183,7 @@ class AssetManager(LoggingMixin):
dags_to_queue_from_asset_alias |= {
alias_ref.dag
for alias_ref in asset_alias_model.scheduled_dags
- if not alias_ref.dag.is_stale and not
alias_ref.dag.is_paused
+ if not alias_ref.dag.is_paused
}
dags_to_queue_from_asset_ref = set(
@@ -197,7 +195,8 @@ class AssetManager(LoggingMixin):
or_(
DagScheduleAssetNameReference.name == asset.name,
DagScheduleAssetUriReference.uri == asset.uri,
- )
+ ),
+ DagModel.is_paused.is_(False),
)
)
)
diff --git a/airflow-core/tests/unit/assets/test_manager.py
b/airflow-core/tests/unit/assets/test_manager.py
index 94b63e72501..330522cbda1 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -21,7 +21,7 @@ import itertools
from unittest import mock
import pytest
-from sqlalchemy import delete
+from sqlalchemy import delete, func, select
from sqlalchemy.orm import Session
from airflow.assets.manager import AssetManager
@@ -204,3 +204,41 @@ class TestAssetManager:
assert len(asset_listener.created) == 1
assert len(asms) == 1
assert asset_listener.created[0].uri == asset.uri == asms[0].uri
+
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_register_asset_change_queues_stale_dag(self, session,
mock_task_instance):
+ asset_manager = AssetManager()
+ asset_listener.clear()
+ bundle_name = "testing"
+
+ # Setup an Asset
+ asset_uri = "test://stale_asset/"
+ asset_name = "test_stale_asset"
+ asset_definition = Asset(uri=asset_uri, name=asset_name)
+
+ asm = AssetModel(uri=asset_uri, name=asset_name, group="asset")
+ session.add(asm)
+
+ # Setup a Dag that is STALE but NOT PAUSED
+ # We want stale Dags to still receive asset updates
+ stale_dag = DagModel(dag_id="stale_dag", is_stale=True,
is_paused=False, bundle_name=bundle_name)
+ session.add(stale_dag)
+
+ # Link the Stale Dag to the Asset
+ asm.scheduled_dags =
[DagScheduleAssetReference(dag_id=stale_dag.dag_id)]
+
+ session.execute(delete(AssetDagRunQueue))
+ session.flush()
+
+ # Register the asset change
+ asset_manager.register_asset_change(
+ task_instance=mock_task_instance, asset=asset_definition,
session=session
+ )
+ session.flush()
+
+ # Verify the stale Dag was NOT ignored
+ assert
session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 1
+
+ queued_id = session.scalar(select(AssetDagRunQueue.target_dag_id))
+ assert queued_id == "stale_dag"
+ asset_listener.clear()
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 10f58785e14..aba109f9d41 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -4626,7 +4626,10 @@ class TestSchedulerJob:
)
session.flush()
assert session.scalars(ase_q).one().source_run_id == dr1.run_id
- assert session.scalars(adrq_q).one_or_none() is None
+ if "is_stale" in disable:
+ assert session.scalars(adrq_q).one_or_none() is not None
+ else:
+ assert session.scalars(adrq_q).one_or_none() is None
# Simulate the consumer DAG being enabled.
session.execute(update(DagModel).where(DagModel.dag_id ==
"consumer").values(**enable))
@@ -4640,6 +4643,7 @@ class TestSchedulerJob:
)
session.flush()
assert [e.source_run_id for e in session.scalars(ase_q)] ==
[dr1.run_id, dr2.run_id]
+ assert len(session.scalars(adrq_q).all()) == 1
assert session.scalars(adrq_q).one().target_dag_id == "consumer"
@time_machine.travel(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9),
tick=False)