This is an automated email from the ASF dual-hosted git repository.
shubhamraj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9d7b2a1c19e Add readable dags checks for the dependencies endpoint
(#62046)
9d7b2a1c19e is described below
commit 9d7b2a1c19ef4adce9e9aadd1f7a036794f4483e
Author: Shubham Raj <[email protected]>
AuthorDate: Wed Feb 18 19:39:01 2026 +0530
Add readable dags checks for the dependencies endpoint (#62046)
---
.../api_fastapi/core_api/routes/ui/dependencies.py | 7 ++-
.../core_api/services/ui/dependencies.py | 17 ++++-
.../core_api/routes/ui/test_dependencies.py | 73 +++++++++++++++++++++-
3 files changed, 88 insertions(+), 9 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dependencies.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dependencies.py
index 3f407f431d1..b2105e7edaf 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dependencies.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dependencies.py
@@ -27,7 +27,7 @@ from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.ui.common import BaseGraphResponse
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
-from airflow.api_fastapi.core_api.security import requires_access_dag
+from airflow.api_fastapi.core_api.security import ReadableDagsFilterDep,
requires_access_dag
from airflow.api_fastapi.core_api.services.ui.dependencies import (
extract_single_connected_component,
get_data_dependencies,
@@ -48,6 +48,7 @@ dependencies_router = AirflowRouter(tags=["Dependencies"])
)
def get_dependencies(
session: SessionDep,
+ readable_dags_filter: ReadableDagsFilterDep,
node_id: str | None = None,
dependency_type: Literal["scheduling", "data"] = "scheduling",
) -> BaseGraphResponse:
@@ -63,10 +64,10 @@ def get_dependencies(
except ValueError:
raise HTTPException(status.HTTP_400_BAD_REQUEST, f"Invalid asset
node_id: {node_id}")
- data = get_data_dependencies(asset_id, session)
+ data = get_data_dependencies(asset_id, session,
readable_dags_filter.value)
return BaseGraphResponse(**data)
- data = get_scheduling_dependencies()
+ data = get_scheduling_dependencies(readable_dags_filter.value)
if node_id is not None:
try:
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dependencies.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dependencies.py
index 3889ca1ed3c..3989ad56293 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dependencies.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dependencies.py
@@ -84,14 +84,17 @@ def extract_single_connected_component(
return {"nodes": nodes, "edges": edges}
-def get_scheduling_dependencies() -> dict[str, list[dict]]:
+def get_scheduling_dependencies(readable_dag_ids: set[str] | None = None) ->
dict[str, list[dict]]:
"""Get scheduling dependencies between DAGs."""
from airflow.models.serialized_dag import SerializedDagModel
nodes_dict: dict[str, dict] = {}
edge_tuples: set[tuple[str, str]] = set()
- for dag, dependencies in
sorted(SerializedDagModel.get_dag_dependencies().items()):
+ dag_dependencies = SerializedDagModel.get_dag_dependencies()
+ for dag, dependencies in sorted(dag_dependencies.items()):
+ if readable_dag_ids is not None and dag not in readable_dag_ids:
+ continue
dag_node_id = f"dag:{dag}"
if dag_node_id not in nodes_dict:
for dep in dependencies:
@@ -123,7 +126,9 @@ def get_scheduling_dependencies() -> dict[str, list[dict]]:
}
-def get_data_dependencies(asset_id: int, session: Session) -> dict[str,
list[dict]]:
+def get_data_dependencies(
+ asset_id: int, session: Session, readable_dag_ids: set[str] | None = None
+) -> dict[str, list[dict]]:
"""Get full task dependencies for an asset."""
from sqlalchemy import select
from sqlalchemy.orm import selectinload
@@ -166,6 +171,9 @@ def get_data_dependencies(asset_id: int, session: Session)
-> dict[str, list[dic
# Process producing tasks (tasks that output this asset)
for ref in asset.producing_tasks:
+ # Filter out tasks from Dags the user doesn't have access to
+ if readable_dag_ids is not None and ref.dag_id not in
readable_dag_ids:
+ continue
task_key = (ref.dag_id, ref.task_id)
task_node_id = f"task:{ref.dag_id}{SEPARATOR}{ref.task_id}"
@@ -195,6 +203,9 @@ def get_data_dependencies(asset_id: int, session: Session)
-> dict[str, list[dic
# Process consuming tasks (tasks that input this asset)
for ref in asset.consuming_tasks:
+ # Filter out tasks from Dags the user doesn't have access to
+ if readable_dag_ids is not None and ref.dag_id not in
readable_dag_ids:
+ continue
task_key = (ref.dag_id, ref.task_id)
task_node_id = f"task:{ref.dag_id}{SEPARATOR}{ref.task_id}"
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py
index 3b9cc00c308..3605d3e3a7a 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py
@@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations
+from unittest import mock
+
import pendulum
import pytest
from sqlalchemy import select
@@ -204,7 +206,7 @@ def expected_secondary_component_response(asset2_id):
class TestGetDependencies:
@pytest.mark.usefixtures("make_primary_connected_component")
def test_should_response_200(self, test_client,
expected_primary_component_response):
- with assert_queries_count(5):
+ with assert_queries_count(6):
response = test_client.get("/dependencies")
assert response.status_code == 200
@@ -240,7 +242,7 @@ class TestGetDependencies:
@pytest.mark.usefixtures("make_primary_connected_component",
"make_secondary_connected_component")
def test_with_node_id_filter(self, test_client, node_id,
expected_response_fixture, request):
expected_response = request.getfixturevalue(expected_response_fixture)
- with assert_queries_count(5):
+ with assert_queries_count(6):
response = test_client.get("/dependencies", params={"node_id":
node_id})
assert response.status_code == 200
@@ -258,7 +260,7 @@ class TestGetDependencies:
(asset1_id, expected_primary_component_response),
(asset2_id, expected_secondary_component_response),
):
- with assert_queries_count(5):
+ with assert_queries_count(6):
response = test_client.get("/dependencies", params={"node_id":
f"asset:{asset_id}"})
assert response.status_code == 200
@@ -332,3 +334,68 @@ class TestGetDependencies:
# Non-asset node_id
response = test_client.get("/dependencies", params={"dependency_type":
"data", "node_id": "dag:test"})
assert response.status_code == 400
+
+ @mock.patch(
+
"airflow.api_fastapi.auth.managers.base_auth_manager.BaseAuthManager.get_authorized_dag_ids",
+ return_value={"upstream", "downstream"},
+ )
+ @pytest.mark.usefixtures("make_primary_connected_component",
"make_secondary_connected_component")
+ def test_scheduling_dependencies_respects_readable_dags_filter(self, _,
test_client):
+ response = test_client.get("/dependencies")
+ assert response.status_code == 200
+
+ result = response.json()
+ dag_node_ids = {node["id"] for node in result["nodes"] if node["type"]
== "dag"}
+ expected_present = ["dag:upstream", "dag:downstream"]
+ expected_absent = [
+ "dag:other_dag",
+ "dag:external_trigger_dag_id",
+ "dag:upstream_secondary",
+ "dag:downstream_secondary",
+ ]
+ for node_id in expected_present:
+ assert node_id in dag_node_ids
+ for node_id in expected_absent:
+ assert node_id not in dag_node_ids
+
+ @pytest.mark.parametrize(
+ ("readable_dags", "expected_present", "expected_absent"),
+ [
+ (
+ {"upstream"},
+ ["task:upstream__SEPARATOR__task2"],
+ [],
+ ),
+ (
+ {"downstream"},
+ [],
+ ["task:upstream__SEPARATOR__task2"],
+ ),
+ ],
+ )
+
@mock.patch("airflow.api_fastapi.auth.managers.base_auth_manager.BaseAuthManager.get_authorized_dag_ids")
+ @pytest.mark.usefixtures("make_primary_connected_component")
+ def test_data_dependencies_respects_readable_dags_filter(
+ self,
+ mock_get_authorized_dag_ids,
+ test_client,
+ asset1_id,
+ readable_dags,
+ expected_present,
+ expected_absent,
+ ):
+ mock_get_authorized_dag_ids.return_value = readable_dags
+
+ response = test_client.get(
+ "/dependencies",
+ params={"node_id": f"asset:{asset1_id}", "dependency_type":
"data"},
+ )
+ assert response.status_code == 200
+
+ result = response.json()
+ nodes_by_id = {node["id"] for node in result["nodes"]}
+ assert f"asset:{asset1_id}" in nodes_by_id
+ for node_id in expected_present:
+ assert node_id in nodes_by_id
+ for node_id in expected_absent:
+ assert node_id not in nodes_by_id