This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 377d786e0bb Do not include inactive dags in dependencies (#47906)
377d786e0bb is described below

commit 377d786e0bbf8617cee0ed0b294df7b911c773c5
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Wed Mar 19 12:16:30 2025 +0800

    Do not include inactive dags in dependencies (#47906)
    
    This adds a filter to exlucde non-active (removed from dag file) dags in
    the dag dependency rendering. This has various side effects, most
    importantly, the UI would now correctly consider an asset to not have an
    upstream (when the upstream dag is removed and no longer runnable), and
    disable the materialize button.
---
 airflow/models/serialized_dag.py    | 18 +++++++++++-------
 tests/models/test_serialized_dag.py | 16 ++++++++++++++--
 2 files changed, 25 insertions(+), 9 deletions(-)

diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index 663629c82c1..5fae9441052 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -464,26 +464,30 @@ class SerializedDagModel(Base):
         :param session: ORM Session
         """
         latest_sdag_subquery = (
-            session.query(cls.dag_id, 
func.max(cls.created_at).label("max_created"))
-            .group_by(cls.dag_id)
-            .subquery()
+            select(cls.dag_id, 
func.max(cls.created_at).label("max_created")).group_by(cls.dag_id).subquery()
         )
         if session.bind.dialect.name in ["sqlite", "mysql"]:
             query = session.execute(
-                select(cls.dag_id, func.json_extract(cls._data, 
"$.dag.dag_dependencies")).join(
+                select(cls.dag_id, func.json_extract(cls._data, 
"$.dag.dag_dependencies"))
+                .join(
                     latest_sdag_subquery,
                     (cls.dag_id == latest_sdag_subquery.c.dag_id)
-                    and (cls.created_at == latest_sdag_subquery.c.max_created),
+                    & (cls.created_at == latest_sdag_subquery.c.max_created),
                 )
+                .join(cls.dag_model)
+                .where(DagModel.is_active)
             )
             iterator = ((dag_id, json.loads(deps_data) if deps_data else []) 
for dag_id, deps_data in query)
         else:
             iterator = session.execute(
-                select(cls.dag_id, func.json_extract_path(cls._data, "dag", 
"dag_dependencies")).join(
+                select(cls.dag_id, func.json_extract_path(cls._data, "dag", 
"dag_dependencies"))
+                .join(
                     latest_sdag_subquery,
                     (cls.dag_id == latest_sdag_subquery.c.dag_id)
-                    and (cls.created_at == latest_sdag_subquery.c.max_created),
+                    & (cls.created_at == latest_sdag_subquery.c.max_created),
                 )
+                .join(cls.dag_model)
+                .where(DagModel.is_active)
             )
         return {dag_id: [DagDependency(**d) for d in (deps_data or [])] for 
dag_id, deps_data in iterator}
 
diff --git a/tests/models/test_serialized_dag.py 
b/tests/models/test_serialized_dag.py
index 9c03f25bddf..f3fbe1ac934 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -23,11 +23,11 @@ from unittest import mock
 
 import pendulum
 import pytest
-from sqlalchemy import func, select
+from sqlalchemy import func, select, update
 
 import airflow.example_dags as example_dags_module
 from airflow.decorators import task as task_decorator
-from airflow.models.dag import DAG
+from airflow.models.dag import DAG, DagModel
 from airflow.models.dag_version import DagVersion
 from airflow.models.dagbag import DagBag
 from airflow.models.serialized_dag import SerializedDagModel as SDM
@@ -371,3 +371,15 @@ class TestSerializedDagModel:
                 dag.tags = sorted(dag.tags, reverse=True)
             sorted_dag = SDM._sort_serialized_dag_dict(dag)
             assert sorted_dag == dag
+
+    def test_get_dependencies(self, session):
+        self._write_example_dags()
+        dag_id = "consumes_asset_decorator"
+
+        dependencies = SDM.get_dag_dependencies(session=session)
+        assert dag_id in dependencies
+
+        # Simulate deleting the DAG from file.
+        session.execute(update(DagModel).where(DagModel.dag_id == 
dag_id).values(is_active=False))
+        dependencies = SDM.get_dag_dependencies(session=session)
+        assert dag_id not in dependencies

Reply via email to