This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d0142061ca Add additional permission check in asset materialization 
(#63338)
6d0142061ca is described below

commit 6d0142061cae9738e046490fc755e506cab7a44e
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Wed Mar 11 14:46:49 2026 +0100

    Add additional permission check in asset materialization (#63338)
---
 .../api_fastapi/core_api/routes/public/assets.py   | 13 +++++++++++++
 .../core_api/routes/public/test_assets.py          | 22 ++++++++++++++++++++++
 2 files changed, 35 insertions(+)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
index 65e90a321b1..68220c20e8f 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -26,6 +26,8 @@ from sqlalchemy.engine import CursorResult
 from sqlalchemy.orm import joinedload, subqueryload
 
 from airflow._shared.timezones import timezone
+from airflow.api_fastapi.app import get_auth_manager
+from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity, DagDetails
 from airflow.api_fastapi.common.dagbag import DagBagDep, 
get_latest_version_of_dag
 from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
 from airflow.api_fastapi.common.parameters import (
@@ -404,6 +406,17 @@ def materialize_asset(
             f"More than one DAG materializes asset with ID: {asset_id}",
         )
 
+    if not get_auth_manager().is_authorized_dag(
+        method="POST",
+        access_entity=DagAccessEntity.RUN,
+        details=DagDetails(id=dag_id),
+        user=user,
+    ):
+        raise HTTPException(
+            status.HTTP_403_FORBIDDEN,
+            f"User is not authorized to trigger a run for DAG: {dag_id} that 
materializes this asset",
+        )
+
     dag = get_latest_version_of_dag(dag_bag, dag_id, session)
 
     if dag.allowed_run_types is not None and DagRunType.ASSET_MATERIALIZATION 
not in dag.allowed_run_types:
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
index 4659bc8873f..7912e995d8e 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
@@ -25,6 +25,7 @@ import time_machine
 from sqlalchemy import delete, func, select, update
 
 from airflow._shared.timezones import timezone
+from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity, DagDetails
 from airflow.models import DagModel
 from airflow.models.asset import (
     AssetActive,
@@ -1444,6 +1445,27 @@ class TestPostAssetMaterialize(TestAssets):
             == f"Dag with dag_id: '{self.DAG_ASSET1_ID}' does not allow asset 
materialization runs"
         )
 
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_should_respond_403_when_user_cannot_trigger_dag(self, 
test_client):
+        with mock.patch(
+            
"airflow.api_fastapi.core_api.routes.public.assets.get_auth_manager",
+            autospec=True,
+        ) as mock_get_auth_manager:
+            mock_get_auth_manager.return_value.is_authorized_dag.return_value 
= False
+
+            response = test_client.post("/assets/1/materialize")
+
+            assert response.status_code == 403
+            assert response.json()["detail"] == (
+                f"User is not authorized to trigger a run for DAG: 
{self.DAG_ASSET1_ID} that materializes this asset"
+            )
+            
mock_get_auth_manager.return_value.is_authorized_dag.assert_called_once_with(
+                method="POST",
+                access_entity=DagAccessEntity.RUN,
+                details=DagDetails(id=self.DAG_ASSET1_ID),
+                user=mock.ANY,
+            )
+
 
 class TestGetAssetQueuedEvents(TestQueuedEventEndpoint):
     @pytest.mark.usefixtures("time_freezer")

Reply via email to