This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b91394aa4cc fix(scheduler): skip asset-triggered dags without 
SerializedDagModel (#64322)
b91394aa4cc is described below

commit b91394aa4ccd94c34fb4c1b8d82c9b24c9577fbe
Author: Leonardo Soares <[email protected]>
AuthorDate: Sun Apr 5 07:57:16 2026 -0300

    fix(scheduler): skip asset-triggered dags without SerializedDagModel 
(#64322)
    
    * fix(scheduler): skip asset-triggered dags without SerializedDagModel in 
dags_needing_dagruns
    
    Remove those dag_ids from the in-memory candidate set until serialization
    exists; retain AssetDagRunQueue rows and emit DEBUG logs. Add unit tests and
    a bugfix newsfragment.
    
    * fix(scheduler): prevent premature asset-triggered DagRuns when 
SerializedDagModel is unavailable
    
    * test(dag): persist DagModel before AssetDagRunQueue in unit tests
    
    Split DagModel and AssetDagRunQueue inserts and flush after DagModel so 
foreign-key order matches production DB constraints in TestDagModel.
    
    * Apply suggestions from code review
    
    Co-authored-by: Wei Lee <[email protected]>
    
    * refactor(dag): clarify ADRQ skip log and condense serialized-DAG guard
    
    Combine the missing-from-serialized set check with a walrus assignment and 
improve the debug message when DagRun creation is skipped for DAGs with queued 
asset events but no SerializedDagModel row.
    
    * test(models): align caplog assertions with updated serialized dag warnings
    
    * test(dag): align ADRQ missing-serialized log assertion with message text
    
    * Apply suggestion from @jscheffl
    
    Co-authored-by: Jens Scheffler <[email protected]>
    
    * chore(newsfragments): remove 64322.bugfix.rst
    
    ---------
    
    Co-authored-by: Wei Lee <[email protected]>
    Co-authored-by: Jens Scheffler <[email protected]>
---
 airflow-core/src/airflow/models/dag.py     |  14 ++++
 airflow-core/tests/unit/models/test_dag.py | 130 ++++++++++++++++++++++++++++-
 2 files changed, 143 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/models/dag.py 
b/airflow-core/src/airflow/models/dag.py
index c7d98dbe311..eff187c633f 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -631,6 +631,10 @@ class DagModel(Base):
         you should ensure that any scheduling decisions are made in a single 
transaction -- as soon as the
         transaction is committed it will be unlocked.
 
+        For asset-triggered scheduling, Dags that have ``AssetDagRunQueue`` 
rows but no matching
+        ``SerializedDagModel`` row are omitted from ``triggered_date_by_dag`` 
until serialization exists;
+        ADRQs are **not** deleted here so the scheduler can re-evaluate on a 
later run.
+
         :meta private:
         """
         from airflow.models.serialized_dag import SerializedDagModel
@@ -677,6 +681,16 @@ class DagModel(Base):
             for dag_id, adrqs in adrq_by_dag.items()
         }
         ser_dags = 
SerializedDagModel.get_latest_serialized_dags(dag_ids=list(dag_statuses), 
session=session)
+        ser_dag_ids = {ser_dag.dag_id for ser_dag in ser_dags}
+        if missing_from_serialized := set(adrq_by_dag.keys()) - ser_dag_ids:
+            log.info(
+                "Dags have queued asset events (ADRQ), but are not found in 
the serialized_dag table."
+                " — skipping Dag run creation: %s",
+                sorted(missing_from_serialized),
+            )
+            for dag_id in missing_from_serialized:
+                del adrq_by_dag[dag_id]
+                del dag_statuses[dag_id]
         for ser_dag in ser_dags:
             dag_id = ser_dag.dag_id
             statuses = dag_statuses[dag_id]
diff --git a/airflow-core/tests/unit/models/test_dag.py 
b/airflow-core/tests/unit/models/test_dag.py
index 046c85ea799..00a0f1a1ef2 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -33,7 +33,7 @@ import jinja2
 import pendulum
 import pytest
 import time_machine
-from sqlalchemy import delete, inspect, select, update
+from sqlalchemy import delete, func, inspect, select, update
 
 from airflow import settings
 from airflow._shared.module_loading import qualname
@@ -2047,6 +2047,134 @@ class TestDagModel:
         dag_models = query.all()
         assert dag_models == [dag_model]
 
+    def test_dags_needing_dagruns_skips_adrq_when_serialized_dag_missing(
+        self, session, caplog, testing_dag_bundle
+    ):
+        """ADRQ rows for a Dag without SerializedDagModel must be skipped (no 
triggered_date_by_dag).
+
+        Rows must remain in ``asset_dag_run_queue`` so the scheduler can 
re-evaluate on a later run once
+        ``SerializedDagModel`` exists (``dags_needing_dagruns`` only drops 
them from the in-memory
+        candidate set, it does not delete ORM rows).
+        """
+        orphan_dag_id = "adrq_no_serialized_dag"
+        orphan_uri = "test://asset_for_orphan_adrq"
+        session.add(AssetModel(uri=orphan_uri))
+        session.flush()
+        asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== orphan_uri))
+
+        dag_model = DagModel(
+            dag_id=orphan_dag_id,
+            bundle_name="testing",
+            max_active_tasks=1,
+            has_task_concurrency_limits=False,
+            max_consecutive_failed_dag_runs=0,
+            next_dagrun=timezone.datetime(2038, 1, 1),
+            next_dagrun_create_after=timezone.datetime(2038, 1, 2),
+            is_stale=False,
+            has_import_errors=False,
+            is_paused=False,
+            asset_expression={"any": [{"uri": orphan_uri}]},
+        )
+        session.add(dag_model)
+        session.flush()
+
+        session.add(AssetDagRunQueue(asset_id=asset_id, 
target_dag_id=orphan_dag_id))
+        session.flush()
+
+        with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"):
+            _query, triggered_date_by_dag = 
DagModel.dags_needing_dagruns(session)
+
+        assert orphan_dag_id not in triggered_date_by_dag
+        assert (
+            "Dags have queued asset events (ADRQ), but are not found in the 
serialized_dag table."
+            in caplog.text
+        )
+        assert orphan_dag_id in caplog.text
+        assert (
+            session.scalar(
+                select(func.count())
+                .select_from(AssetDagRunQueue)
+                .where(AssetDagRunQueue.target_dag_id == orphan_dag_id)
+            )
+            == 1
+        )
+
+    def 
test_dags_needing_dagruns_missing_serialized_debug_lists_sorted_dag_ids(
+        self, session, caplog, testing_dag_bundle
+    ):
+        """When multiple dags lack SerializedDagModel, the debug log lists 
dag_ids sorted."""
+        session.add_all(
+            [
+                AssetModel(uri="test://ds_ghost_z"),
+                AssetModel(uri="test://ds_ghost_a"),
+            ]
+        )
+        session.flush()
+        id_z = session.scalar(select(AssetModel.id).where(AssetModel.uri == 
"test://ds_ghost_z"))
+        id_a = session.scalar(select(AssetModel.id).where(AssetModel.uri == 
"test://ds_ghost_a"))
+        far = timezone.datetime(2038, 1, 1)
+        far_after = timezone.datetime(2038, 1, 2)
+        session.add_all(
+            [
+                DagModel(
+                    dag_id="ghost_z",
+                    bundle_name="testing",
+                    max_active_tasks=1,
+                    has_task_concurrency_limits=False,
+                    max_consecutive_failed_dag_runs=0,
+                    next_dagrun=far,
+                    next_dagrun_create_after=far_after,
+                    is_stale=False,
+                    has_import_errors=False,
+                    is_paused=False,
+                    asset_expression={"any": [{"uri": "test://ds_ghost_z"}]},
+                ),
+                DagModel(
+                    dag_id="ghost_a",
+                    bundle_name="testing",
+                    max_active_tasks=1,
+                    has_task_concurrency_limits=False,
+                    max_consecutive_failed_dag_runs=0,
+                    next_dagrun=far,
+                    next_dagrun_create_after=far_after,
+                    is_stale=False,
+                    has_import_errors=False,
+                    is_paused=False,
+                    asset_expression={"any": [{"uri": "test://ds_ghost_a"}]},
+                ),
+            ]
+        )
+        session.flush()
+
+        session.add_all(
+            [
+                AssetDagRunQueue(asset_id=id_z, target_dag_id="ghost_z"),
+                AssetDagRunQueue(asset_id=id_a, target_dag_id="ghost_a"),
+            ]
+        )
+        session.flush()
+
+        with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"):
+            _query, triggered_date_by_dag = 
DagModel.dags_needing_dagruns(session)
+
+        assert "ghost_a" not in triggered_date_by_dag
+        assert "ghost_z" not in triggered_date_by_dag
+        msg = next(
+            r.message
+            for r in caplog.records
+            if "Dags have queued asset events (ADRQ), but are not found in the 
serialized_dag table."
+            in r.message
+        )
+        assert msg.index("ghost_a") < msg.index("ghost_z")
+        assert (
+            session.scalar(
+                select(func.count())
+                .select_from(AssetDagRunQueue)
+                .where(AssetDagRunQueue.target_dag_id.in_(("ghost_a", 
"ghost_z")))
+            )
+            == 2
+        )
+
     def test_dags_needing_dagruns_query_count(self, dag_maker, session):
         """Test that dags_needing_dagruns avoids N+1 on adrq.asset access."""
         num_assets = 10

Reply via email to