This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1974666b1cf Update serialization for producer side asset access 
control (#67658)
1974666b1cf is described below

commit 1974666b1cf6f5d67fdcbe5c390dff4e222ce951
Author: Vincent <[email protected]>
AuthorDate: Thu May 28 16:44:32 2026 -0700

    Update serialization for producer side asset access control (#67658)
---
 airflow-core/src/airflow/serialization/encoders.py |   3 +-
 .../unit/serialization/test_serialized_objects.py  | 107 ++++++++++++++++++++-
 2 files changed, 108 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/encoders.py 
b/airflow-core/src/airflow/serialization/encoders.py
index b9caea4cc37..85791cd781c 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -198,9 +198,10 @@ def encode_asset_like(a: BaseAsset | SerializedAssetBase) 
-> dict[str, Any]:
                     d["access_control"] = ac
             else:
                 # Asset stores access_control as an AssetAccessControl 
instance.
-                if ac.producer_teams or not ac.allow_global:
+                if ac.producer_teams or ac.consumer_teams or not 
ac.allow_global:
                     d["access_control"] = {
                         "producer_teams": ac.producer_teams,
+                        "consumer_teams": ac.consumer_teams,
                         "allow_global": ac.allow_global,
                     }
             return d
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py 
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 6d558628e74..c6d1fe0be80 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -770,7 +770,47 @@ def test_encode_asset_with_access_control():
         access_control=AssetAccessControl(producer_teams=["team_a"], 
allow_global=False),
     )
     encoded = encode_asset_like(asset)
-    assert encoded["access_control"] == {"producer_teams": ["team_a"], 
"allow_global": False}
+    assert encoded["access_control"] == {
+        "producer_teams": ["team_a"],
+        "consumer_teams": [],
+        "allow_global": False,
+    }
+
+
+def test_encode_asset_with_consumer_teams():
+    from airflow.sdk import Asset, AssetAccessControl
+    from airflow.serialization.encoders import encode_asset_like
+
+    asset = Asset(
+        name="test",
+        uri="s3://bucket/key",
+        access_control=AssetAccessControl(consumer_teams=["team_ml", 
"team_data"]),
+    )
+    encoded = encode_asset_like(asset)
+    assert encoded["access_control"] == {
+        "producer_teams": [],
+        "consumer_teams": ["team_ml", "team_data"],
+        "allow_global": True,
+    }
+
+
+def test_encode_asset_with_both_producer_and_consumer_teams():
+    from airflow.sdk import Asset, AssetAccessControl
+    from airflow.serialization.encoders import encode_asset_like
+
+    asset = Asset(
+        name="test",
+        uri="s3://bucket/key",
+        access_control=AssetAccessControl(
+            producer_teams=["team_a"], consumer_teams=["team_b"], 
allow_global=False
+        ),
+    )
+    encoded = encode_asset_like(asset)
+    assert encoded["access_control"] == {
+        "producer_teams": ["team_a"],
+        "consumer_teams": ["team_b"],
+        "allow_global": False,
+    }
 
 
 def test_encode_asset_without_access_control_omits_key():
@@ -799,6 +839,31 @@ def test_decode_asset_with_access_control():
     assert decoded.access_control == {"producer_teams": ["team_a"], 
"allow_global": False}
 
 
+def test_decode_asset_with_consumer_teams():
+    from airflow.serialization.decoders import decode_asset_like
+
+    decoded = decode_asset_like(
+        {
+            "__type": "asset",
+            "name": "test",
+            "uri": "s3://bucket/key",
+            "group": "asset",
+            "extra": {},
+            "watchers": [],
+            "access_control": {
+                "producer_teams": ["team_a"],
+                "consumer_teams": ["team_ml"],
+                "allow_global": False,
+            },
+        }
+    )
+    assert decoded.access_control == {
+        "producer_teams": ["team_a"],
+        "consumer_teams": ["team_ml"],
+        "allow_global": False,
+    }
+
+
 def test_decode_asset_defaults_access_control_to_empty_dict():
     from airflow.serialization.decoders import decode_asset_like
 
@@ -815,6 +880,46 @@ def 
test_decode_asset_defaults_access_control_to_empty_dict():
     assert decoded.access_control == {}
 
 
+def test_access_control_round_trip_with_consumer_teams():
+    from airflow.sdk import Asset, AssetAccessControl
+    from airflow.serialization.decoders import decode_asset_like
+    from airflow.serialization.encoders import encode_asset_like
+
+    asset = Asset(
+        name="test",
+        uri="s3://bucket/key",
+        access_control=AssetAccessControl(
+            producer_teams=["team_a"], consumer_teams=["team_ml", 
"team_data"], allow_global=False
+        ),
+    )
+    encoded = encode_asset_like(asset)
+    decoded = decode_asset_like(encoded)
+
+    assert decoded.access_control == {
+        "producer_teams": ["team_a"],
+        "consumer_teams": ["team_ml", "team_data"],
+        "allow_global": False,
+    }
+
+
+def test_access_control_round_trip_consumer_teams_only():
+    from airflow.sdk import Asset, AssetAccessControl
+    from airflow.serialization.decoders import decode_asset_like
+    from airflow.serialization.encoders import encode_asset_like
+
+    asset = Asset(
+        name="test",
+        uri="s3://bucket/key",
+        access_control=AssetAccessControl(consumer_teams=["team_ml"]),
+    )
+    encoded = encode_asset_like(asset)
+    decoded = decode_asset_like(encoded)
+
+    assert decoded.access_control["consumer_teams"] == ["team_ml"]
+    assert decoded.access_control["producer_teams"] == []
+    assert decoded.access_control["allow_global"] is True
+
+
 def test_encode_timezone():
     from airflow.serialization.serialized_objects import encode_timezone
 

Reply via email to