This is an automated email from the ASF dual-hosted git repository.

shubhamraj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d7b2a1c19e Add readable dags checks for the dependencies endpoint 
(#62046)
9d7b2a1c19e is described below

commit 9d7b2a1c19ef4adce9e9aadd1f7a036794f4483e
Author: Shubham Raj <[email protected]>
AuthorDate: Wed Feb 18 19:39:01 2026 +0530

    Add readable dags checks for the dependencies endpoint (#62046)
---
 .../api_fastapi/core_api/routes/ui/dependencies.py |  7 ++-
 .../core_api/services/ui/dependencies.py           | 17 ++++-
 .../core_api/routes/ui/test_dependencies.py        | 73 +++++++++++++++++++++-
 3 files changed, 88 insertions(+), 9 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dependencies.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dependencies.py
index 3f407f431d1..b2105e7edaf 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dependencies.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dependencies.py
@@ -27,7 +27,7 @@ from airflow.api_fastapi.common.db.common import SessionDep
 from airflow.api_fastapi.common.router import AirflowRouter
 from airflow.api_fastapi.core_api.datamodels.ui.common import BaseGraphResponse
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
-from airflow.api_fastapi.core_api.security import requires_access_dag
+from airflow.api_fastapi.core_api.security import ReadableDagsFilterDep, 
requires_access_dag
 from airflow.api_fastapi.core_api.services.ui.dependencies import (
     extract_single_connected_component,
     get_data_dependencies,
@@ -48,6 +48,7 @@ dependencies_router = AirflowRouter(tags=["Dependencies"])
 )
 def get_dependencies(
     session: SessionDep,
+    readable_dags_filter: ReadableDagsFilterDep,
     node_id: str | None = None,
     dependency_type: Literal["scheduling", "data"] = "scheduling",
 ) -> BaseGraphResponse:
@@ -63,10 +64,10 @@ def get_dependencies(
         except ValueError:
             raise HTTPException(status.HTTP_400_BAD_REQUEST, f"Invalid asset 
node_id: {node_id}")
 
-        data = get_data_dependencies(asset_id, session)
+        data = get_data_dependencies(asset_id, session, 
readable_dags_filter.value)
         return BaseGraphResponse(**data)
 
-    data = get_scheduling_dependencies()
+    data = get_scheduling_dependencies(readable_dags_filter.value)
 
     if node_id is not None:
         try:
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dependencies.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dependencies.py
index 3889ca1ed3c..3989ad56293 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dependencies.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dependencies.py
@@ -84,14 +84,17 @@ def extract_single_connected_component(
     return {"nodes": nodes, "edges": edges}
 
 
-def get_scheduling_dependencies() -> dict[str, list[dict]]:
+def get_scheduling_dependencies(readable_dag_ids: set[str] | None = None) -> 
dict[str, list[dict]]:
     """Get scheduling dependencies between DAGs."""
     from airflow.models.serialized_dag import SerializedDagModel
 
     nodes_dict: dict[str, dict] = {}
     edge_tuples: set[tuple[str, str]] = set()
 
-    for dag, dependencies in 
sorted(SerializedDagModel.get_dag_dependencies().items()):
+    dag_dependencies = SerializedDagModel.get_dag_dependencies()
+    for dag, dependencies in sorted(dag_dependencies.items()):
+        if readable_dag_ids is not None and dag not in readable_dag_ids:
+            continue
         dag_node_id = f"dag:{dag}"
         if dag_node_id not in nodes_dict:
             for dep in dependencies:
@@ -123,7 +126,9 @@ def get_scheduling_dependencies() -> dict[str, list[dict]]:
     }
 
 
-def get_data_dependencies(asset_id: int, session: Session) -> dict[str, 
list[dict]]:
+def get_data_dependencies(
+    asset_id: int, session: Session, readable_dag_ids: set[str] | None = None
+) -> dict[str, list[dict]]:
     """Get full task dependencies for an asset."""
     from sqlalchemy import select
     from sqlalchemy.orm import selectinload
@@ -166,6 +171,9 @@ def get_data_dependencies(asset_id: int, session: Session) 
-> dict[str, list[dic
 
         # Process producing tasks (tasks that output this asset)
         for ref in asset.producing_tasks:
+            # Filter out tasks from Dags the user doesn't have access to
+            if readable_dag_ids is not None and ref.dag_id not in 
readable_dag_ids:
+                continue
             task_key = (ref.dag_id, ref.task_id)
             task_node_id = f"task:{ref.dag_id}{SEPARATOR}{ref.task_id}"
 
@@ -195,6 +203,9 @@ def get_data_dependencies(asset_id: int, session: Session) 
-> dict[str, list[dic
 
         # Process consuming tasks (tasks that input this asset)
         for ref in asset.consuming_tasks:
+            # Filter out tasks from Dags the user doesn't have access to
+            if readable_dag_ids is not None and ref.dag_id not in 
readable_dag_ids:
+                continue
             task_key = (ref.dag_id, ref.task_id)
             task_node_id = f"task:{ref.dag_id}{SEPARATOR}{ref.task_id}"
 
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py
index 3b9cc00c308..3605d3e3a7a 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py
@@ -16,6 +16,8 @@
 # under the License.
 from __future__ import annotations
 
+from unittest import mock
+
 import pendulum
 import pytest
 from sqlalchemy import select
@@ -204,7 +206,7 @@ def expected_secondary_component_response(asset2_id):
 class TestGetDependencies:
     @pytest.mark.usefixtures("make_primary_connected_component")
     def test_should_response_200(self, test_client, 
expected_primary_component_response):
-        with assert_queries_count(5):
+        with assert_queries_count(6):
             response = test_client.get("/dependencies")
         assert response.status_code == 200
 
@@ -240,7 +242,7 @@ class TestGetDependencies:
     @pytest.mark.usefixtures("make_primary_connected_component", 
"make_secondary_connected_component")
     def test_with_node_id_filter(self, test_client, node_id, 
expected_response_fixture, request):
         expected_response = request.getfixturevalue(expected_response_fixture)
-        with assert_queries_count(5):
+        with assert_queries_count(6):
             response = test_client.get("/dependencies", params={"node_id": 
node_id})
         assert response.status_code == 200
 
@@ -258,7 +260,7 @@ class TestGetDependencies:
             (asset1_id, expected_primary_component_response),
             (asset2_id, expected_secondary_component_response),
         ):
-            with assert_queries_count(5):
+            with assert_queries_count(6):
                 response = test_client.get("/dependencies", params={"node_id": 
f"asset:{asset_id}"})
             assert response.status_code == 200
 
@@ -332,3 +334,68 @@ class TestGetDependencies:
         # Non-asset node_id
         response = test_client.get("/dependencies", params={"dependency_type": 
"data", "node_id": "dag:test"})
         assert response.status_code == 400
+
+    @mock.patch(
+        
"airflow.api_fastapi.auth.managers.base_auth_manager.BaseAuthManager.get_authorized_dag_ids",
+        return_value={"upstream", "downstream"},
+    )
+    @pytest.mark.usefixtures("make_primary_connected_component", 
"make_secondary_connected_component")
+    def test_scheduling_dependencies_respects_readable_dags_filter(self, _, 
test_client):
+        response = test_client.get("/dependencies")
+        assert response.status_code == 200
+
+        result = response.json()
+        dag_node_ids = {node["id"] for node in result["nodes"] if node["type"] 
== "dag"}
+        expected_present = ["dag:upstream", "dag:downstream"]
+        expected_absent = [
+            "dag:other_dag",
+            "dag:external_trigger_dag_id",
+            "dag:upstream_secondary",
+            "dag:downstream_secondary",
+        ]
+        for node_id in expected_present:
+            assert node_id in dag_node_ids
+        for node_id in expected_absent:
+            assert node_id not in dag_node_ids
+
+    @pytest.mark.parametrize(
+        ("readable_dags", "expected_present", "expected_absent"),
+        [
+            (
+                {"upstream"},
+                ["task:upstream__SEPARATOR__task2"],
+                [],
+            ),
+            (
+                {"downstream"},
+                [],
+                ["task:upstream__SEPARATOR__task2"],
+            ),
+        ],
+    )
+    
@mock.patch("airflow.api_fastapi.auth.managers.base_auth_manager.BaseAuthManager.get_authorized_dag_ids")
+    @pytest.mark.usefixtures("make_primary_connected_component")
+    def test_data_dependencies_respects_readable_dags_filter(
+        self,
+        mock_get_authorized_dag_ids,
+        test_client,
+        asset1_id,
+        readable_dags,
+        expected_present,
+        expected_absent,
+    ):
+        mock_get_authorized_dag_ids.return_value = readable_dags
+
+        response = test_client.get(
+            "/dependencies",
+            params={"node_id": f"asset:{asset1_id}", "dependency_type": 
"data"},
+        )
+        assert response.status_code == 200
+
+        result = response.json()
+        nodes_by_id = {node["id"] for node in result["nodes"]}
+        assert f"asset:{asset1_id}" in nodes_by_id
+        for node_id in expected_present:
+            assert node_id in nodes_by_id
+        for node_id in expected_absent:
+            assert node_id not in nodes_by_id

Reply via email to