This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ba36e029c58 Add `consumer_teams` to `AssetAccessControl` in task-sdk 
(#67625)
ba36e029c58 is described below

commit ba36e029c584de6e20640a0b640aa8f0242443c5
Author: Vincent <[email protected]>
AuthorDate: Thu May 28 09:42:33 2026 -0700

    Add `consumer_teams` to `AssetAccessControl` in task-sdk (#67625)
---
 .../sdk/definitions/asset/access_control.py        | 10 ++++--
 .../definitions/test_asset_access_control.py       | 37 +++++++++++++++++++++-
 2 files changed, 43 insertions(+), 4 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/definitions/asset/access_control.py 
b/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
index 04c218bfdb6..e642c14f0c5 100644
--- a/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
+++ b/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
@@ -19,10 +19,10 @@ from __future__ import annotations
 import attrs
 
 
-def _validate_producer_teams(instance, attribute, value):
+def _validate_teams(instance, attribute, value):
     for entry in value:
         if not isinstance(entry, str) or not entry or entry.isspace():
-            raise ValueError("Each entry in producer_teams must be a non-empty 
string")
+            raise ValueError(f"Each entry in {attribute.name} must be a 
non-empty string")
     return value
 
 
@@ -32,6 +32,10 @@ class AssetAccessControl:
 
     producer_teams: list[str] = attrs.field(
         factory=list,
-        validator=[_validate_producer_teams],
+        validator=[_validate_teams],
+    )
+    consumer_teams: list[str] = attrs.field(
+        factory=list,
+        validator=[_validate_teams],
     )
     allow_global: bool = attrs.field(default=True, 
validator=[attrs.validators.instance_of(bool)])
diff --git a/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py 
b/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
index 8524d8b0ccb..86e4f916aa6 100644
--- a/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
+++ b/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
@@ -25,11 +25,17 @@ class TestAssetAccessControl:
     def test_defaults(self):
         ac = AssetAccessControl()
         assert ac.producer_teams == []
+        assert ac.consumer_teams == []
         assert ac.allow_global is True
 
     def test_explicit_values(self):
-        ac = AssetAccessControl(producer_teams=["team_a", "team_b"], 
allow_global=False)
+        ac = AssetAccessControl(
+            producer_teams=["team_a", "team_b"],
+            consumer_teams=["team_c"],
+            allow_global=False,
+        )
         assert ac.producer_teams == ["team_a", "team_b"]
+        assert ac.consumer_teams == ["team_c"]
         assert ac.allow_global is False
 
     @pytest.mark.parametrize(
@@ -47,6 +53,21 @@ class TestAssetAccessControl:
         with pytest.raises(ValueError, match="producer_teams"):
             AssetAccessControl(producer_teams=teams)
 
+    @pytest.mark.parametrize(
+        "teams",
+        [
+            [""],
+            [123],
+            [None],
+            [True],
+            [{}],
+            ["team_a", "  ", "team_b"],
+        ],
+    )
+    def test_rejects_invalid_consumer_teams(self, teams):
+        with pytest.raises(ValueError, match="consumer_teams"):
+            AssetAccessControl(consumer_teams=teams)
+
     @pytest.mark.parametrize(
         "teams",
         [
@@ -61,6 +82,20 @@ class TestAssetAccessControl:
         ac = AssetAccessControl(producer_teams=teams)
         assert ac.producer_teams == teams
 
+    @pytest.mark.parametrize(
+        "teams",
+        [
+            [],
+            ["team_a"],
+            ["team_a", "team_b"],
+            ["team-with-dashes"],
+            ["team_with_underscores"],
+        ],
+    )
+    def test_accepts_valid_consumer_teams(self, teams):
+        ac = AssetAccessControl(consumer_teams=teams)
+        assert ac.consumer_teams == teams
+
     def test_allow_global_must_be_bool(self):
         with pytest.raises(TypeError):
             AssetAccessControl(allow_global="yes")

Reply via email to