This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 60ff4669c9b Consumer team asset filtering: add API endpoint support 
(#68034)
60ff4669c9b is described below

commit 60ff4669c9b265c6e43ec4339ea447663328d8cc
Author: Vincent <[email protected]>
AuthorDate: Mon Jun 8 12:27:54 2026 -0400

    Consumer team asset filtering: add API endpoint support (#68034)
---
 .../api_fastapi/core_api/datamodels/assets.py      |  8 ++
 .../core_api/openapi/v2-rest-api-generated.yaml    | 21 ++++++
 .../api_fastapi/core_api/routes/public/assets.py   |  7 ++
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts | 38 ++++++++++
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |  9 +++
 .../core_api/routes/public/test_assets.py          | 86 +++++++++++++++++-----
 .../src/airflowctl/api/datamodels/generated.py     | 13 ++++
 7 files changed, 163 insertions(+), 19 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
index f463f3ba5e3..753ea94f1d9 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -179,12 +179,20 @@ class QueuedEventCollectionResponse(BaseModel):
     total_entries: int
 
 
+class AssetEventAccessControl(StrictBaseModel):
+    """Access control settings for asset event consumer team filtering."""
+
+    consumer_teams: list[str] | None = None
+    allow_global: bool = True
+
+
 class CreateAssetEventsBody(StrictBaseModel):
     """Create asset events request."""
 
     asset_id: int
     partition_key: str | None = None
     extra: dict = Field(default_factory=dict)
+    access_control: AssetEventAccessControl | None = None
 
     @field_validator("extra", mode="after")
     def set_from_rest_api(cls, v: dict) -> dict:
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 185da9a9e69..1d82fb555d5 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -11430,6 +11430,23 @@ components:
       - total_entries
       title: AssetCollectionResponse
       description: Asset collection response.
+    AssetEventAccessControl:
+      properties:
+        consumer_teams:
+          anyOf:
+          - items:
+              type: string
+            type: array
+          - type: 'null'
+          title: Consumer Teams
+        allow_global:
+          type: boolean
+          title: Allow Global
+          default: true
+      additionalProperties: false
+      type: object
+      title: AssetEventAccessControl
+      description: Access control settings for asset event consumer team 
filtering.
     AssetEventCollectionResponse:
       properties:
         asset_events:
@@ -12947,6 +12964,10 @@ components:
           additionalProperties: true
           type: object
           title: Extra
+        access_control:
+          anyOf:
+          - $ref: '#/components/schemas/AssetEventAccessControl'
+          - type: 'null'
       additionalProperties: false
       type: object
       required:
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
index 5e3afa8b3c0..9525b1f796d 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -375,8 +375,13 @@ def create_asset_event(
     timestamp = timezone.utcnow()
 
     api_user_teams: set[str] = set()
+    api_allow_consumer_teams: list[str] | None = None
+    api_allow_global_consumers: bool = True
     if conf.getboolean("core", "multi_team"):
         api_user_teams = get_auth_manager().get_authorized_teams(user=user)
+        if body.access_control:
+            api_allow_consumer_teams = body.access_control.consumer_teams or 
None
+            api_allow_global_consumers = body.access_control.allow_global
 
     assets_event = asset_manager.register_asset_change(
         asset=asset_model,
@@ -385,6 +390,8 @@ def create_asset_event(
         partition_key=body.partition_key,
         source_is_api=True,
         api_user_teams=api_user_teams,
+        api_allow_consumer_teams=api_allow_consumer_teams,
+        api_allow_global_consumers=api_allow_global_consumers,
         session=session,
     )
 
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index fbebbf10c8e..651d22d03b5 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -143,6 +143,34 @@ export const $AssetCollectionResponse = {
     description: 'Asset collection response.'
 } as const;
 
+export const $AssetEventAccessControl = {
+    properties: {
+        consumer_teams: {
+            anyOf: [
+                {
+                    items: {
+                        type: 'string'
+                    },
+                    type: 'array'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Consumer Teams'
+        },
+        allow_global: {
+            type: 'boolean',
+            title: 'Allow Global',
+            default: true
+        }
+    },
+    additionalProperties: false,
+    type: 'object',
+    title: 'AssetEventAccessControl',
+    description: 'Access control settings for asset event consumer team 
filtering.'
+} as const;
+
 export const $AssetEventCollectionResponse = {
     properties: {
         asset_events: {
@@ -2338,6 +2366,16 @@ export const $CreateAssetEventsBody = {
             additionalProperties: true,
             type: 'object',
             title: 'Extra'
+        },
+        access_control: {
+            anyOf: [
+                {
+                    '$ref': '#/components/schemas/AssetEventAccessControl'
+                },
+                {
+                    type: 'null'
+                }
+            ]
         }
     },
     additionalProperties: false,
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index aca86b11e05..fb93b5e093d 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -46,6 +46,14 @@ export type AssetCollectionResponse = {
     total_entries: number;
 };
 
+/**
+ * Access control settings for asset event consumer team filtering.
+ */
+export type AssetEventAccessControl = {
+    consumer_teams?: Array<(string)> | null;
+    allow_global?: boolean;
+};
+
 /**
  * Asset event collection response.
  */
@@ -691,6 +699,7 @@ export type CreateAssetEventsBody = {
     extra?: {
         [key: string]: unknown;
     };
+    access_control?: AssetEventAccessControl | null;
 };
 
 /**
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
index 6de32ee53c2..72224441031 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
@@ -47,6 +47,7 @@ from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.asserts import assert_queries_count
+from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.db import (
     clear_db_assets,
     clear_db_dag_bundles,
@@ -1395,8 +1396,6 @@ class TestPostAssetEvents(TestAssets):
 class TestPostAssetEventsTeamResolution(TestAssets):
     """Tests for team-based filtering in create_asset_event."""
 
-    _ROUTE = "airflow.api_fastapi.core_api.routes.public.assets"
-
     def _make_mock_event(self, asset):
         m = mock.MagicMock(
             spec=AssetEvent,
@@ -1421,27 +1420,20 @@ class TestPostAssetEventsTeamResolution(TestAssets):
     @pytest.mark.parametrize(
         ("multi_team", "expected_teams"),
         [
-            pytest.param(True, {"team_a", "team_b"}, id="enabled"),
-            pytest.param(False, set(), id="disabled"),
+            pytest.param("True", {"team_a", "team_b"}, id="enabled"),
+            pytest.param("False", set(), id="disabled"),
         ],
     )
-    def test_team_resolution(self, test_client, session, multi_team, 
expected_teams):
+    
@mock.patch("airflow.api_fastapi.core_api.routes.public.assets.asset_manager.register_asset_change")
+    
@mock.patch("airflow.api_fastapi.core_api.routes.public.assets.get_auth_manager")
+    def test_team_resolution(
+        self, mock_get_auth_manager, mock_register, test_client, session, 
multi_team, expected_teams
+    ):
         (asset,) = self.create_assets(num=1, session=session)
-        mock_auth_mgr = mock.MagicMock()
-        mock_auth_mgr.get_authorized_teams.return_value = {"team_a", "team_b"}
+        mock_get_auth_manager.return_value.get_authorized_teams.return_value = 
{"team_a", "team_b"}
+        mock_register.return_value = self._make_mock_event(asset)
 
-        with (
-            mock.patch(
-                f"{self._ROUTE}.conf.getboolean",
-                side_effect=lambda s, k, **kw: multi_team if k == "multi_team" 
else kw.get("fallback"),
-            ),
-            mock.patch(f"{self._ROUTE}.get_auth_manager", 
return_value=mock_auth_mgr),
-            mock.patch(
-                f"{self._ROUTE}.asset_manager.register_asset_change",
-                spec=True,
-                return_value=self._make_mock_event(asset),
-            ) as mock_register,
-        ):
+        with conf_vars({("core", "multi_team"): multi_team}):
             response = test_client.post("/assets/events", json={"asset_id": 
asset.id, "extra": {}})
 
         assert response.status_code == 200
@@ -1449,6 +1441,62 @@ class TestPostAssetEventsTeamResolution(TestAssets):
         assert call_kwargs["source_is_api"] is True
         assert call_kwargs["api_user_teams"] == expected_teams
 
+    @pytest.mark.usefixtures("time_freezer")
+    @pytest.mark.parametrize(
+        ("multi_team", "access_control", "expected_consumer_teams", 
"expected_allow_global"),
+        [
+            pytest.param(
+                "True",
+                {"consumer_teams": ["team_ml", "team_data"], "allow_global": 
False},
+                ["team_ml", "team_data"],
+                False,
+                id="multi_team_enabled_with_consumer_teams",
+            ),
+            pytest.param(
+                "True",
+                None,
+                None,
+                True,
+                id="multi_team_enabled_no_access_control",
+            ),
+            pytest.param(
+                "False",
+                {"consumer_teams": ["team_ml"], "allow_global": False},
+                None,
+                True,
+                id="multi_team_disabled_access_control_ignored",
+            ),
+        ],
+    )
+    
@mock.patch("airflow.api_fastapi.core_api.routes.public.assets.asset_manager.register_asset_change")
+    
@mock.patch("airflow.api_fastapi.core_api.routes.public.assets.get_auth_manager")
+    def test_access_control_consumer_teams(
+        self,
+        mock_get_auth_manager,
+        mock_register,
+        test_client,
+        session,
+        multi_team,
+        access_control,
+        expected_consumer_teams,
+        expected_allow_global,
+    ):
+        (asset,) = self.create_assets(num=1, session=session)
+        mock_get_auth_manager.return_value.get_authorized_teams.return_value = 
{"team_a"}
+        mock_register.return_value = self._make_mock_event(asset)
+
+        payload = {"asset_id": asset.id, "extra": {}}
+        if access_control is not None:
+            payload["access_control"] = access_control
+
+        with conf_vars({("core", "multi_team"): multi_team}):
+            response = test_client.post("/assets/events", json=payload)
+
+        assert response.status_code == 200
+        call_kwargs = mock_register.call_args.kwargs
+        assert call_kwargs["api_allow_consumer_teams"] == 
expected_consumer_teams
+        assert call_kwargs["api_allow_global_consumers"] == 
expected_allow_global
+
 
 @pytest.mark.need_serialized_dag
 class TestPostAssetMaterialize(TestAssets):
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 4aac0e9a6fa..4e13fd0fcb6 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -49,6 +49,18 @@ class AssetAliasResponse(BaseModel):
     group: Annotated[str, Field(title="Group")]
 
 
+class AssetEventAccessControl(BaseModel):
+    """
+    Access control settings for asset event consumer team filtering.
+    """
+
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    consumer_teams: Annotated[list[str] | None, Field(title="Consumer Teams")] 
= None
+    allow_global: Annotated[bool | None, Field(title="Allow Global")] = True
+
+
 class AssetStoreWriterKind(str, Enum):
     """
     Identifies what kind of writer last updated an asset store entry.
@@ -415,6 +427,7 @@ class CreateAssetEventsBody(BaseModel):
     asset_id: Annotated[int, Field(title="Asset Id")]
     partition_key: Annotated[str | None, Field(title="Partition Key")] = None
     extra: Annotated[dict[str, Any] | None, Field(title="Extra")] = None
+    access_control: AssetEventAccessControl | None = None
 
 
 class DAGPatchBody(BaseModel):

Reply via email to