This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b91394aa4cc fix(scheduler): skip asset-triggered dags without
SerializedDagModel (#64322)
b91394aa4cc is described below
commit b91394aa4ccd94c34fb4c1b8d82c9b24c9577fbe
Author: Leonardo Soares <[email protected]>
AuthorDate: Sun Apr 5 07:57:16 2026 -0300
fix(scheduler): skip asset-triggered dags without SerializedDagModel
(#64322)
* fix(scheduler): skip asset-triggered dags without SerializedDagModel in
dags_needing_dagruns
Remove those dag_ids from the in-memory candidate set until serialization
exists; retain AssetDagRunQueue rows and emit DEBUG logs. Add unit tests and
a bugfix newsfragment.
* fix(scheduler): prevent premature asset-triggered DagRuns when
SerializedDagModel is unavailable
* test(dag): persist DagModel before AssetDagRunQueue in unit tests
Split DagModel and AssetDagRunQueue inserts and flush after DagModel so
foreign-key order matches production DB constraints in TestDagModel.
* Apply suggestions from code review
Co-authored-by: Wei Lee <[email protected]>
* refactor(dag): clarify ADRQ skip log and condense serialized-DAG guard
Combine the missing-from-serialized set check with a walrus assignment and
improve the debug message when DagRun creation is skipped for DAGs with queued
asset events but no SerializedDagModel row.
* test(models): align caplog assertions with updated serialized dag warnings
* test(dag): align ADRQ missing-serialized log assertion with message text
* Apply suggestion from @jscheffl
Co-authored-by: Jens Scheffler <[email protected]>
* chore(newsfragments): remove 64322.bugfix.rst
---------
Co-authored-by: Wei Lee <[email protected]>
Co-authored-by: Jens Scheffler <[email protected]>
---
airflow-core/src/airflow/models/dag.py | 14 ++++
airflow-core/tests/unit/models/test_dag.py | 130 ++++++++++++++++++++++++++++-
2 files changed, 143 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/models/dag.py
b/airflow-core/src/airflow/models/dag.py
index c7d98dbe311..eff187c633f 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -631,6 +631,10 @@ class DagModel(Base):
you should ensure that any scheduling decisions are made in a single
transaction -- as soon as the
transaction is committed it will be unlocked.
+ For asset-triggered scheduling, Dags that have ``AssetDagRunQueue``
rows but no matching
+ ``SerializedDagModel`` row are omitted from ``triggered_date_by_dag``
until serialization exists;
+ ADRQs are **not** deleted here so the scheduler can re-evaluate on a
later run.
+
:meta private:
"""
from airflow.models.serialized_dag import SerializedDagModel
@@ -677,6 +681,16 @@ class DagModel(Base):
for dag_id, adrqs in adrq_by_dag.items()
}
ser_dags =
SerializedDagModel.get_latest_serialized_dags(dag_ids=list(dag_statuses),
session=session)
+ ser_dag_ids = {ser_dag.dag_id for ser_dag in ser_dags}
+ if missing_from_serialized := set(adrq_by_dag.keys()) - ser_dag_ids:
+ log.info(
+ "Dags have queued asset events (ADRQ), but are not found in
the serialized_dag table."
+ " — skipping Dag run creation: %s",
+ sorted(missing_from_serialized),
+ )
+ for dag_id in missing_from_serialized:
+ del adrq_by_dag[dag_id]
+ del dag_statuses[dag_id]
for ser_dag in ser_dags:
dag_id = ser_dag.dag_id
statuses = dag_statuses[dag_id]
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index 046c85ea799..00a0f1a1ef2 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -33,7 +33,7 @@ import jinja2
import pendulum
import pytest
import time_machine
-from sqlalchemy import delete, inspect, select, update
+from sqlalchemy import delete, func, inspect, select, update
from airflow import settings
from airflow._shared.module_loading import qualname
@@ -2047,6 +2047,134 @@ class TestDagModel:
dag_models = query.all()
assert dag_models == [dag_model]
+ def test_dags_needing_dagruns_skips_adrq_when_serialized_dag_missing(
+ self, session, caplog, testing_dag_bundle
+ ):
+ """ADRQ rows for a Dag without SerializedDagModel must be skipped (no
triggered_date_by_dag).
+
+ Rows must remain in ``asset_dag_run_queue`` so the scheduler can
re-evaluate on a later run once
+ ``SerializedDagModel`` exists (``dags_needing_dagruns`` only drops
them from the in-memory
+ candidate set, it does not delete ORM rows).
+ """
+ orphan_dag_id = "adrq_no_serialized_dag"
+ orphan_uri = "test://asset_for_orphan_adrq"
+ session.add(AssetModel(uri=orphan_uri))
+ session.flush()
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== orphan_uri))
+
+ dag_model = DagModel(
+ dag_id=orphan_dag_id,
+ bundle_name="testing",
+ max_active_tasks=1,
+ has_task_concurrency_limits=False,
+ max_consecutive_failed_dag_runs=0,
+ next_dagrun=timezone.datetime(2038, 1, 1),
+ next_dagrun_create_after=timezone.datetime(2038, 1, 2),
+ is_stale=False,
+ has_import_errors=False,
+ is_paused=False,
+ asset_expression={"any": [{"uri": orphan_uri}]},
+ )
+ session.add(dag_model)
+ session.flush()
+
+ session.add(AssetDagRunQueue(asset_id=asset_id,
target_dag_id=orphan_dag_id))
+ session.flush()
+
+ with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"):
+ _query, triggered_date_by_dag =
DagModel.dags_needing_dagruns(session)
+
+ assert orphan_dag_id not in triggered_date_by_dag
+ assert (
+ "Dags have queued asset events (ADRQ), but are not found in the
serialized_dag table."
+ in caplog.text
+ )
+ assert orphan_dag_id in caplog.text
+ assert (
+ session.scalar(
+ select(func.count())
+ .select_from(AssetDagRunQueue)
+ .where(AssetDagRunQueue.target_dag_id == orphan_dag_id)
+ )
+ == 1
+ )
+
+ def
test_dags_needing_dagruns_missing_serialized_debug_lists_sorted_dag_ids(
+ self, session, caplog, testing_dag_bundle
+ ):
+ """When multiple dags lack SerializedDagModel, the debug log lists
dag_ids sorted."""
+ session.add_all(
+ [
+ AssetModel(uri="test://ds_ghost_z"),
+ AssetModel(uri="test://ds_ghost_a"),
+ ]
+ )
+ session.flush()
+ id_z = session.scalar(select(AssetModel.id).where(AssetModel.uri ==
"test://ds_ghost_z"))
+ id_a = session.scalar(select(AssetModel.id).where(AssetModel.uri ==
"test://ds_ghost_a"))
+ far = timezone.datetime(2038, 1, 1)
+ far_after = timezone.datetime(2038, 1, 2)
+ session.add_all(
+ [
+ DagModel(
+ dag_id="ghost_z",
+ bundle_name="testing",
+ max_active_tasks=1,
+ has_task_concurrency_limits=False,
+ max_consecutive_failed_dag_runs=0,
+ next_dagrun=far,
+ next_dagrun_create_after=far_after,
+ is_stale=False,
+ has_import_errors=False,
+ is_paused=False,
+ asset_expression={"any": [{"uri": "test://ds_ghost_z"}]},
+ ),
+ DagModel(
+ dag_id="ghost_a",
+ bundle_name="testing",
+ max_active_tasks=1,
+ has_task_concurrency_limits=False,
+ max_consecutive_failed_dag_runs=0,
+ next_dagrun=far,
+ next_dagrun_create_after=far_after,
+ is_stale=False,
+ has_import_errors=False,
+ is_paused=False,
+ asset_expression={"any": [{"uri": "test://ds_ghost_a"}]},
+ ),
+ ]
+ )
+ session.flush()
+
+ session.add_all(
+ [
+ AssetDagRunQueue(asset_id=id_z, target_dag_id="ghost_z"),
+ AssetDagRunQueue(asset_id=id_a, target_dag_id="ghost_a"),
+ ]
+ )
+ session.flush()
+
+ with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"):
+ _query, triggered_date_by_dag =
DagModel.dags_needing_dagruns(session)
+
+ assert "ghost_a" not in triggered_date_by_dag
+ assert "ghost_z" not in triggered_date_by_dag
+ msg = next(
+ r.message
+ for r in caplog.records
+ if "Dags have queued asset events (ADRQ), but are not found in the
serialized_dag table."
+ in r.message
+ )
+ assert msg.index("ghost_a") < msg.index("ghost_z")
+ assert (
+ session.scalar(
+ select(func.count())
+ .select_from(AssetDagRunQueue)
+ .where(AssetDagRunQueue.target_dag_id.in_(("ghost_a",
"ghost_z")))
+ )
+ == 2
+ )
+
def test_dags_needing_dagruns_query_count(self, dag_maker, session):
"""Test that dags_needing_dagruns avoids N+1 on adrq.asset access."""
num_assets = 10