This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ba36e029c58 Add `consumer_teams` to `AssetAccessControl` in task-sdk
(#67625)
ba36e029c58 is described below
commit ba36e029c584de6e20640a0b640aa8f0242443c5
Author: Vincent <[email protected]>
AuthorDate: Thu May 28 09:42:33 2026 -0700
Add `consumer_teams` to `AssetAccessControl` in task-sdk (#67625)
---
.../sdk/definitions/asset/access_control.py | 10 ++++--
.../definitions/test_asset_access_control.py | 37 +++++++++++++++++++++-
2 files changed, 43 insertions(+), 4 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
b/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
index 04c218bfdb6..e642c14f0c5 100644
--- a/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
+++ b/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
@@ -19,10 +19,10 @@ from __future__ import annotations
import attrs
-def _validate_producer_teams(instance, attribute, value):
+def _validate_teams(instance, attribute, value):
for entry in value:
if not isinstance(entry, str) or not entry or entry.isspace():
- raise ValueError("Each entry in producer_teams must be a non-empty
string")
+ raise ValueError(f"Each entry in {attribute.name} must be a
non-empty string")
return value
@@ -32,6 +32,10 @@ class AssetAccessControl:
producer_teams: list[str] = attrs.field(
factory=list,
- validator=[_validate_producer_teams],
+ validator=[_validate_teams],
+ )
+ consumer_teams: list[str] = attrs.field(
+ factory=list,
+ validator=[_validate_teams],
)
allow_global: bool = attrs.field(default=True,
validator=[attrs.validators.instance_of(bool)])
diff --git a/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
b/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
index 8524d8b0ccb..86e4f916aa6 100644
--- a/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
+++ b/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
@@ -25,11 +25,17 @@ class TestAssetAccessControl:
def test_defaults(self):
ac = AssetAccessControl()
assert ac.producer_teams == []
+ assert ac.consumer_teams == []
assert ac.allow_global is True
def test_explicit_values(self):
- ac = AssetAccessControl(producer_teams=["team_a", "team_b"],
allow_global=False)
+ ac = AssetAccessControl(
+ producer_teams=["team_a", "team_b"],
+ consumer_teams=["team_c"],
+ allow_global=False,
+ )
assert ac.producer_teams == ["team_a", "team_b"]
+ assert ac.consumer_teams == ["team_c"]
assert ac.allow_global is False
@pytest.mark.parametrize(
@@ -47,6 +53,21 @@ class TestAssetAccessControl:
with pytest.raises(ValueError, match="producer_teams"):
AssetAccessControl(producer_teams=teams)
+ @pytest.mark.parametrize(
+ "teams",
+ [
+ [""],
+ [123],
+ [None],
+ [True],
+ [{}],
+ ["team_a", " ", "team_b"],
+ ],
+ )
+ def test_rejects_invalid_consumer_teams(self, teams):
+ with pytest.raises(ValueError, match="consumer_teams"):
+ AssetAccessControl(consumer_teams=teams)
+
@pytest.mark.parametrize(
"teams",
[
@@ -61,6 +82,20 @@ class TestAssetAccessControl:
ac = AssetAccessControl(producer_teams=teams)
assert ac.producer_teams == teams
+ @pytest.mark.parametrize(
+ "teams",
+ [
+ [],
+ ["team_a"],
+ ["team_a", "team_b"],
+ ["team-with-dashes"],
+ ["team_with_underscores"],
+ ],
+ )
+ def test_accepts_valid_consumer_teams(self, teams):
+ ac = AssetAccessControl(consumer_teams=teams)
+ assert ac.consumer_teams == teams
+
def test_allow_global_must_be_bool(self):
with pytest.raises(TypeError):
AssetAccessControl(allow_global="yes")