This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1974666b1cf Update serialization for producer side asset access
control (#67658)
1974666b1cf is described below
commit 1974666b1cf6f5d67fdcbe5c390dff4e222ce951
Author: Vincent <[email protected]>
AuthorDate: Thu May 28 16:44:32 2026 -0700
Update serialization for producer side asset access control (#67658)
---
airflow-core/src/airflow/serialization/encoders.py | 3 +-
.../unit/serialization/test_serialized_objects.py | 107 ++++++++++++++++++++-
2 files changed, 108 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/serialization/encoders.py
b/airflow-core/src/airflow/serialization/encoders.py
index b9caea4cc37..85791cd781c 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -198,9 +198,10 @@ def encode_asset_like(a: BaseAsset | SerializedAssetBase)
-> dict[str, Any]:
d["access_control"] = ac
else:
# Asset stores access_control as an AssetAccessControl
instance.
- if ac.producer_teams or not ac.allow_global:
+ if ac.producer_teams or ac.consumer_teams or not
ac.allow_global:
d["access_control"] = {
"producer_teams": ac.producer_teams,
+ "consumer_teams": ac.consumer_teams,
"allow_global": ac.allow_global,
}
return d
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 6d558628e74..c6d1fe0be80 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -770,7 +770,47 @@ def test_encode_asset_with_access_control():
access_control=AssetAccessControl(producer_teams=["team_a"],
allow_global=False),
)
encoded = encode_asset_like(asset)
- assert encoded["access_control"] == {"producer_teams": ["team_a"],
"allow_global": False}
+ assert encoded["access_control"] == {
+ "producer_teams": ["team_a"],
+ "consumer_teams": [],
+ "allow_global": False,
+ }
+
+
+def test_encode_asset_with_consumer_teams():
+ from airflow.sdk import Asset, AssetAccessControl
+ from airflow.serialization.encoders import encode_asset_like
+
+ asset = Asset(
+ name="test",
+ uri="s3://bucket/key",
+ access_control=AssetAccessControl(consumer_teams=["team_ml",
"team_data"]),
+ )
+ encoded = encode_asset_like(asset)
+ assert encoded["access_control"] == {
+ "producer_teams": [],
+ "consumer_teams": ["team_ml", "team_data"],
+ "allow_global": True,
+ }
+
+
+def test_encode_asset_with_both_producer_and_consumer_teams():
+ from airflow.sdk import Asset, AssetAccessControl
+ from airflow.serialization.encoders import encode_asset_like
+
+ asset = Asset(
+ name="test",
+ uri="s3://bucket/key",
+ access_control=AssetAccessControl(
+ producer_teams=["team_a"], consumer_teams=["team_b"],
allow_global=False
+ ),
+ )
+ encoded = encode_asset_like(asset)
+ assert encoded["access_control"] == {
+ "producer_teams": ["team_a"],
+ "consumer_teams": ["team_b"],
+ "allow_global": False,
+ }
def test_encode_asset_without_access_control_omits_key():
@@ -799,6 +839,31 @@ def test_decode_asset_with_access_control():
assert decoded.access_control == {"producer_teams": ["team_a"],
"allow_global": False}
+def test_decode_asset_with_consumer_teams():
+ from airflow.serialization.decoders import decode_asset_like
+
+ decoded = decode_asset_like(
+ {
+ "__type": "asset",
+ "name": "test",
+ "uri": "s3://bucket/key",
+ "group": "asset",
+ "extra": {},
+ "watchers": [],
+ "access_control": {
+ "producer_teams": ["team_a"],
+ "consumer_teams": ["team_ml"],
+ "allow_global": False,
+ },
+ }
+ )
+ assert decoded.access_control == {
+ "producer_teams": ["team_a"],
+ "consumer_teams": ["team_ml"],
+ "allow_global": False,
+ }
+
+
def test_decode_asset_defaults_access_control_to_empty_dict():
from airflow.serialization.decoders import decode_asset_like
@@ -815,6 +880,46 @@ def
test_decode_asset_defaults_access_control_to_empty_dict():
assert decoded.access_control == {}
+def test_access_control_round_trip_with_consumer_teams():
+ from airflow.sdk import Asset, AssetAccessControl
+ from airflow.serialization.decoders import decode_asset_like
+ from airflow.serialization.encoders import encode_asset_like
+
+ asset = Asset(
+ name="test",
+ uri="s3://bucket/key",
+ access_control=AssetAccessControl(
+ producer_teams=["team_a"], consumer_teams=["team_ml",
"team_data"], allow_global=False
+ ),
+ )
+ encoded = encode_asset_like(asset)
+ decoded = decode_asset_like(encoded)
+
+ assert decoded.access_control == {
+ "producer_teams": ["team_a"],
+ "consumer_teams": ["team_ml", "team_data"],
+ "allow_global": False,
+ }
+
+
+def test_access_control_round_trip_consumer_teams_only():
+ from airflow.sdk import Asset, AssetAccessControl
+ from airflow.serialization.decoders import decode_asset_like
+ from airflow.serialization.encoders import encode_asset_like
+
+ asset = Asset(
+ name="test",
+ uri="s3://bucket/key",
+ access_control=AssetAccessControl(consumer_teams=["team_ml"]),
+ )
+ encoded = encode_asset_like(asset)
+ decoded = decode_asset_like(encoded)
+
+ assert decoded.access_control["consumer_teams"] == ["team_ml"]
+ assert decoded.access_control["producer_teams"] == []
+ assert decoded.access_control["allow_global"] is True
+
+
def test_encode_timezone():
from airflow.serialization.serialized_objects import encode_timezone